Skip to main content

Implementing SVD++ Recommendation Algorithm with PySpark

· 3 min read

Notes documenting the key insights and implementation details encountered while building SVD++ with PySpark.

SVD++ Formula

SVD++ formula

The SVD++ algorithm extends matrix factorization by incorporating implicit feedback, allowing it to leverage both explicit ratings and implicit signals from user behavior.

Loss Calculation

dot = 0  # <q_i, (p_u + sum_{j in Iu} y_j / sqrt{Iu}>
for f in range(self.n_factors):
dot += qi[i, f] * (pu[u, f] + u_impl_fdb[f])
err = r - (global_mean + bu[u] + bi[i] + dot)

SGD Parameter Updates

# update biases
bu[u] += lr_bu * (err - reg_bu * bu[u])
bi[i] += lr_bi * (err - reg_bi * bi[i])

# update factors
for f in range(self.n_factors):
puf = pu[u, f]
qif = qi[i, f]
pu[u, f] += lr_pu * (err * qif - reg_pu * puf)
qi[i, f] += lr_qi * (err * (puf + u_impl_fdb[f]) - reg_qi * qif)
for j in Iu:
yj[j, f] += lr_yj * (err * qif / sqrt_Iu - reg_yj * yj[j, f])

References and Examples

Implementation Notes

Broadcast Variable Limitations

  1. Time overhead from pickle serialization
  2. Object size limit of 4GB
  3. Constrained by heap size limitations

Converting RDD with numpy.ndarray to DataFrame

Purpose: Keep parameter updates in RDD while using DataFrames for batch disk operations

from pyspark.mllib.random import RandomRDDs
from pyspark.ml.linalg import DenseVector
rdd1 = RandomRDDs.normalVectorRDD(sc, 100, 200, numPartitions=None, seed=9999)
rdd2 = rdd1.zipWithIndex()
sdf = rdd2.map(lambda x: (DenseVector(x[0]), x[1])).toDF()

Converting RowMatrix to Spark DataFrame

sdf = rmat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.map(lambda x: (x, )).toDF()
sdf.select(['_1']).rdd.map(lambda x: (numpy.argsort(-(x['_1'].toArray()))).tolist()).collect() # Sort array indices for each row

Batch Writing Matrix Data to Disk

Purpose: Save data in batches to disk after parameter updates

for threshold in range(10):
sdf.filter(sdf['_2'] <= threshold).write.parquet(f"file:///home/tom/tmp/test_{threshold}.parquet")

Using glom Requires Attention to RDD Partition Count

rdd = sc.parallelize(4, 3, numSlices=2)
rdd = RandomRDDs.normalVectorRDD(sc, 4, 3, numPartitions=2)

Creating BlockMatrix with IndexRowMatrix Blocks

# TODO: Implementation needed

Flattening List of Lists of Tuples to List of Tuples

Purpose: Convert executor-computed list of (item, user, rank) tuples into a single list for easy DataFrame conversion

rdd.collect()
# Output: [[(0, 1, 2), (2, 3, 4)], [(1, 2, 5), (2, 3, 3)]]
rdd.flatMap(lambda x: x).collect()
# Output: [(0, 1, 2), (2, 3, 4), (1, 2, 5), (2, 3, 3)]

Avoiding OutOfMemory Issues with Broadcast

References:

Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.