Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but
very small, and transform doesn't do shuffle. I guess you don't have enough
partition, so please repartition your input dataset to a number at least
larger than the # of executors you have.

In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic
net, and in that version, we use quasi newton for optimization, so it will
be a way faster than SGD implementation. Also, in that
implementation, StandardScaler is not required since in computing the loss
function, we implicitly do this for you.

https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

Please try this out and give us feedback. Thanks.

On Wednesday, June 3, 2015, Piero Cinquegrana <pcinquegr...@marketshare.com>
wrote:

>  Hello User group,
>
>
>
> I have a RDD of LabeledPoint composed of sparse vectors like showing
> below. In the next step, I am standardizing the columns with the Standard
> Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to
> complete the standardization with 10 nodes and 80 executors. The
> spark.executor.memory was set to 2g and the driver memory to 5g.
>
>
>
> scala> val parsedData = stack_sorted.mapPartitions( partition =>
>
>                                                         partition.map{row
> => LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2,
> CategoriesIdx, InteractionIds, tupleMap, vecLength))
>
>                                                                      },
> preservesPartitioning=true).cache()
>
>
> CategoriesIdx: Array[Int] = Array(3, 8, 12)
>
> InteractionIds: Array[(Int, Int)] = Array((13,12))
>
> vecLength: Int = 2450
>
> parsedData:
> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
> MapPartitionsRDD[93] at mapPartitions at <console>:111
>
> (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
>
> (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
>
> (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
>
> (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
>
> (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))
>
>
>
>
>
> My suspicious is that because the data is partitioned using a custom
> partitioner the Standard Scaler is causing a major shuffle operation. Any
> suggestion on how to improve the performance this step and a
> LinearRegressionWithSGD() which is also taking a very long time?
>
>
>
> scala> parsedData.partitioner
>
> res72: Option[org.apache.spark.Partitioner] = Some(
> org.apache.spark.HashPartitioner@d2)
>
>
>
> scala> val scaler = new StandardScaler(withMean = false, withStd =
> true).fit(parsedData.map( row =>  row.features))
>
> scala> val scaledData = parsedData.mapPartitions(partition =>
> partition.map{row => LabeledPoint(row.label,
> scaler.transform(row.features))}).cache()
>
>
>
> scala> val numIterations = 100
>
> scala> val stepSize = 0.1
>
> scala> val miniBatchFraction = 0.1
>
> scala> val algorithm = new LinearRegressionWithSGD()
>
>
>
> scala> algorithm.setIntercept(false)
>
> scala> algorithm.optimizer.setNumIterations(numIterations)
>
> scala> algorithm.optimizer.setStepSize(stepSize)
>
> scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
>
>
>
> scala> val model = algorithm.run(scaledData)
>
>
>
> Best,
>
>
>
> Piero Cinquegrana
>
> Marketing Scientist | MarketShare
> 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
> P: 310.914.5677 x242 M: 323.377.9197
> www.marketshare.com <http://www.marketsharepartners.com/>
> twitter.com/marketsharep
>
>
>

Reply via email to