Implementing SVD++ Recommendation Algorithm with PySpark
Notes documenting the key insights and implementation details encountered while building SVD++ with PySpark.
SVD++ Formula
The SVD++ algorithm extends matrix factorization by incorporating implicit feedback, allowing it to leverage both explicit ratings and implicit signals from user behavior.
Loss Calculation
dot = 0 # <q_i, (p_u + sum_{j in Iu} y_j / sqrt{Iu}>
for f in range(self.n_factors):
dot += qi[i, f] * (pu[u, f] + u_impl_fdb[f])
err = r - (global_mean + bu[u] + bi[i] + dot)
SGD Parameter Updates
# update biases
bu[u] += lr_bu * (err - reg_bu * bu[u])
bi[i] += lr_bi * (err - reg_bi * bi[i])
# update factors
for f in range(self.n_factors):
puf = pu[u, f]
qif = qi[i, f]
pu[u, f] += lr_pu * (err * qif - reg_pu * puf)
qi[i, f] += lr_qi * (err * (puf + u_impl_fdb[f]) - reg_qi * qif)
for j in Iu:
yj[j, f] += lr_yj * (err * qif / sqrt_Iu - reg_yj * yj[j, f])
References and Examples
- How Yelp handles large matrix multiplications with PySpark
- PySpark Factorization Machine implementation reference
- Scala Factorization Machine implementation reference
- Python SVD++ implementation reference
- Parameter Server architecture discussion
Implementation Notes
Broadcast Variable Limitations
- Time overhead from pickle serialization
- Object size limit of 4GB
- Constrained by heap size limitations
Converting RDD with numpy.ndarray to DataFrame
Purpose: Keep parameter updates in RDD while using DataFrames for batch disk operations
from pyspark.mllib.random import RandomRDDs
from pyspark.ml.linalg import DenseVector
rdd1 = RandomRDDs.normalVectorRDD(sc, 100, 200, numPartitions=None, seed=9999)
rdd2 = rdd1.zipWithIndex()
sdf = rdd2.map(lambda x: (DenseVector(x[0]), x[1])).toDF()
Converting RowMatrix to Spark DataFrame
sdf = rmat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.map(lambda x: (x, )).toDF()
sdf.select(['_1']).rdd.map(lambda x: (numpy.argsort(-(x['_1'].toArray()))).tolist()).collect() # Sort array indices for each row
Batch Writing Matrix Data to Disk
Purpose: Save data in batches to disk after parameter updates
for threshold in range(10):
sdf.filter(sdf['_2'] <= threshold).write.parquet(f"file:///home/tom/tmp/test_{threshold}.parquet")
Using glom
Requires Attention to RDD Partition Count
rdd = sc.parallelize(4, 3, numSlices=2)
rdd = RandomRDDs.normalVectorRDD(sc, 4, 3, numPartitions=2)
Creating BlockMatrix with IndexRowMatrix Blocks
# TODO: Implementation needed
Flattening List of Lists of Tuples to List of Tuples
Purpose: Convert executor-computed list of (item, user, rank) tuples into a single list for easy DataFrame conversion
rdd.collect()
# Output: [[(0, 1, 2), (2, 3, 4)], [(1, 2, 5), (2, 3, 3)]]
rdd.flatMap(lambda x: x).collect()
# Output: [(0, 1, 2), (2, 3, 4), (1, 2, 5), (2, 3, 3)]
Avoiding OutOfMemory Issues with Broadcast
References:
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.