Spark中的矩阵“相除”

背景

我们知道,线性代数的矩阵中,其实是没有矩阵除法这个概念的.

所以这里讲的相除其实是两个维度完全一样的矩阵对应点一一相除的意思.

类似于python 中np.array/np.array

1
2
3
4
import numpy as np
a = np.array([[1,2], [3,4]])
b = np.array([[2,3], [1,4]])
print("mat a is: \n {0} \n mat b is:\n {1} \n mat a/b is: \n {2}".format(a, b, a/b))
mat a is: 
 [[1 2]
 [3 4]] 
 mat b is:
 [[2 3]
 [1 4]] 
 mat a/b is: 
 [[0.5        0.66666667]
 [3.         1.        ]]

但是查阅Spark的官方文档

矩阵的乘法运算是通过 rowmatrix mutiply densematrix 来实现的,加法和减法都没有问题.

但是对应点一一相除则没有相关的方法来达到目的,所以需要自己做一些变通,其实也非常简单.

但是,由于Spark是在RDD的进行并行计算的,所以不能将数据拿出来到比如np.array来实现相关的算法,因为这需要将所有数据collect到某个节点的内存中进行,显然不OK.

注意到DenseVector在Spark中是有除法的,所以结果也很简单,将两个矩阵构造在两个Densevector 矩阵的RDD中,然后通过RDD1.zip(RDD2)同时操作两个RDD矩阵实现矩阵的除法.

构造矩阵

1
2
from pyspark import SparkContext
from pyspark.mllib.linalg.distributed import RowMatrix
1
sc = SparkContext('local', 'local_test_recommend')
1
2
3
4
5
6
rowsa = sc.parallelize([[1, 2], [3, 4]])
rowsb = sc.parallelize([[2, 3], [1, 4]])
sma = RowMatrix(rowsa)
smb = RowMatrix(rowsb)
print(sma.rows.collect())
print(smb.rows.collect())
[DenseVector([1.0, 2.0]), DenseVector([3.0, 4.0])]
[DenseVector([2.0, 3.0]), DenseVector([1.0, 4.0])]

矩阵相除

1
sma.rows.zip(smb.rows).map(lambda x: x[0]/x[1]).collect()
[DenseVector([0.5, 0.6667]), DenseVector([3.0, 1.0])]