Hello User group,

I have a RDD of LabeledPoint composed of sparse vectors like showing below. In 
the next step, I am standardizing the columns with the Standard Scaler. The 
data has 2450 columns and ~110M rows. It took 1.5hrs to complete the 
standardization with 10 nodes and 80 executors. The spark.executor.memory was 
set to 2g and the driver memory to 5g.

scala> val parsedData = stack_sorted.mapPartitions( partition =>
                                                        partition.map{row => 
LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, 
InteractionIds, tupleMap, vecLength))
                                                                     }, 
preservesPartitioning=true).cache()

CategoriesIdx: Array[Int] = Array(3, 8, 12)
InteractionIds: Array[(Int, Int)] = Array((13,12))
vecLength: Int = 2450
parsedData: 
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = 
MapPartitionsRDD[93] at mapPartitions at <console>:111
(1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
(0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
(2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
(1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
(0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))


My suspicious is that because the data is partitioned using a custom 
partitioner the Standard Scaler is causing a major shuffle operation. Any 
suggestion on how to improve the performance this step and a 
LinearRegressionWithSGD() which is also taking a very long time?

scala> parsedData.partitioner
res72: Option[org.apache.spark.Partitioner] = 
Some(org.apache.spark.HashPartitioner@d2<mailto:org.apache.spark.HashPartitioner@d2>)

scala> val scaler = new StandardScaler(withMean = false, withStd = 
true).fit(parsedData.map( row =>  row.features))
scala> val scaledData = parsedData.mapPartitions(partition => partition.map{row 
=> LabeledPoint(row.label, scaler.transform(row.features))}).cache()

scala> val numIterations = 100
scala> val stepSize = 0.1
scala> val miniBatchFraction = 0.1
scala> val algorithm = new LinearRegressionWithSGD()

scala> algorithm.setIntercept(false)
scala> algorithm.optimizer.setNumIterations(numIterations)
scala> algorithm.optimizer.setStepSize(stepSize)
scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)

scala> val model = algorithm.run(scaledData)

Best,

Piero Cinquegrana
Marketing Scientist | MarketShare
11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
P: 310.914.5677 x242 M: 323.377.9197
www.marketshare.com<http://www.marketsharepartners.com/>
twitter.com/marketsharep<http://twitter.com/marketsharep>

Reply via email to