[ 
https://issues.apache.org/jira/browse/SPARK-23810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dciborow updated SPARK-23810:
-----------------------------
    Attachment: image-2018-04-02-20-34-44-980.png

> Matrix Multiplication is so bad, file I/O to local python is better
> -------------------------------------------------------------------
>
>                 Key: SPARK-23810
>                 URL: https://issues.apache.org/jira/browse/SPARK-23810
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 2.2.0
>            Reporter: dciborow
>            Priority: Minor
>         Attachments: image-2018-04-02-20-34-44-980.png
>
>
> I am trying to multiple two matrices. One is 130k by 30k. The second is 30k 
> by 30k.
> Running this leads to hearbeat timeout, Java Heap Space and Garage collection 
> errors.
> {{rdd.toBlockMatrix.multiply(rightRdd.toBlockMatrix).toIndexedRowMatrix()}}
> {{I have also tried the following which will fail on the toLocalMatrix call. 
> }}
> val userMatrix = new CoordinateMatrix(userRDD).toIndexedRowMatrix()
>  val itemMatrix = new 
> CoordinateMatrix(itemRDD).toBlockMatrix().toLocalMatrix()
> val itemMatrixBC = session.sparkContext.broadcast(itemMatrix)
>  val userToItemMatrix = userMatrix
>  .multiply(itemMatrixBC.value)
>  .rows.map(index => (index.index.toInt, index.vector))
>  
> I instead have gotten this operation "working", by saving the inputs 
> dataframes to parquet(which start as DataFrames before the .rdd call to get 
> them to work with the matrix types), and then loading them into 
> python/pandas, using numpy for the matrix mulplication, saving back to 
> parquet, and rereading back into spark.
>  
> Python -
> import pandas as pd
> import numpy as np
> X = pd.read_parquet('./items-parquet', engine='pyarrow')
> #Xp = np.stack(X.jaccardList)
> Xp = pd.DataFrame(np.stack(X.jaccardList), X.itemID)
> Xrows = pd.DataFrame(index=range(0, X.itemID.max()+1))
> Xpp = Xrows.join(Xp).fillna(0)
> Y = pd.read_parquet('./users-parquet',engine='pyarrow')
> Yp = np.stack(Y.flatList)
> Z = np.matmul(Yp, Xpp)
> Zp = pd.DataFrame(Z)
> Zp.columns = list(map(str, Zp.columns))
> Zpp = pd.DataFrame()
> Zpp['id'] = Zp.index
> Zpp['ratings'] = Zp.values.tolist()
> Zpp.to_parquet("sampleout.parquet",engine='pyarrow')
>  
> Scala -
> import sys.process._
>  val result = "python matmul.py".!
>  val pythonOutput = 
> userDataFrame.sparkSession.read.parquet("./sampleout.parquet")
>  
> I can provide code, and the data to repo. But could use some instructions how 
> to set that up. This is based on the MovieLens 20mil dataset, or I can 
> provide access to my data in Azure. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to