Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g.
scala> val parsedData = stack_sorted.mapPartitions( partition => partition.map{row => LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at <console>:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala> parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@d2<mailto:org.apache.spark.HashPartitioner@d2>) scala> val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row => row.features)) scala> val scaledData = parsedData.mapPartitions(partition => partition.map{row => LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala> val numIterations = 100 scala> val stepSize = 0.1 scala> val miniBatchFraction = 0.1 scala> val algorithm = new LinearRegressionWithSGD() scala> algorithm.setIntercept(false) scala> algorithm.optimizer.setNumIterations(numIterations) scala> algorithm.optimizer.setStepSize(stepSize) scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala> val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242 M: 323.377.9197 www.marketshare.com<http://www.marketsharepartners.com/> twitter.com/marketsharep<http://twitter.com/marketsharep>