[ https://issues.apache.org/jira/browse/SPARK-23810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dciborow updated SPARK-23810: ----------------------------- Attachment: image-2018-04-02-20-34-44-980.png > Matrix Multiplication is so bad, file I/O to local python is better > ------------------------------------------------------------------- > > Key: SPARK-23810 > URL: https://issues.apache.org/jira/browse/SPARK-23810 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 2.2.0 > Reporter: dciborow > Priority: Minor > Attachments: image-2018-04-02-20-34-44-980.png > > > I am trying to multiple two matrices. One is 130k by 30k. The second is 30k > by 30k. > Running this leads to hearbeat timeout, Java Heap Space and Garage collection > errors. > {{rdd.toBlockMatrix.multiply(rightRdd.toBlockMatrix).toIndexedRowMatrix()}} > {{I have also tried the following which will fail on the toLocalMatrix call. > }} > val userMatrix = new CoordinateMatrix(userRDD).toIndexedRowMatrix() > val itemMatrix = new > CoordinateMatrix(itemRDD).toBlockMatrix().toLocalMatrix() > val itemMatrixBC = session.sparkContext.broadcast(itemMatrix) > val userToItemMatrix = userMatrix > .multiply(itemMatrixBC.value) > .rows.map(index => (index.index.toInt, index.vector)) > > I instead have gotten this operation "working", by saving the inputs > dataframes to parquet(which start as DataFrames before the .rdd call to get > them to work with the matrix types), and then loading them into > python/pandas, using numpy for the matrix mulplication, saving back to > parquet, and rereading back into spark. > > Python - > import pandas as pd > import numpy as np > X = pd.read_parquet('./items-parquet', engine='pyarrow') > #Xp = np.stack(X.jaccardList) > Xp = pd.DataFrame(np.stack(X.jaccardList), X.itemID) > Xrows = pd.DataFrame(index=range(0, X.itemID.max()+1)) > Xpp = Xrows.join(Xp).fillna(0) > Y = pd.read_parquet('./users-parquet',engine='pyarrow') > Yp = np.stack(Y.flatList) > Z = np.matmul(Yp, Xpp) > Zp = pd.DataFrame(Z) > Zp.columns = list(map(str, Zp.columns)) > Zpp = pd.DataFrame() > Zpp['id'] = Zp.index > Zpp['ratings'] = Zp.values.tolist() > Zpp.to_parquet("sampleout.parquet",engine='pyarrow') > > Scala - > import sys.process._ > val result = "python matmul.py".! > val pythonOutput = > userDataFrame.sparkSession.read.parquet("./sampleout.parquet") > > I can provide code, and the data to repo. But could use some instructions how > to set that up. This is based on the MovieLens 20mil dataset, or I can > provide access to my data in Azure. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org