The fit part is very slow, transform not at all.

The number of partitions was 210 vs number of executors 80.

Spark 1.4 sounds great but as my company is using Qubole we are dependent upon 
them to upgrade from version 1.3.1. Until that happens, can you think of any 
other reasons as to why it could be slow. Sparse vectors? Excessive number of 
columns?

Sent from my mobile device. Please excuse any typos.

On Jun 3, 2015, at 9:53 PM, DB Tsai 
<dbt...@dbtsai.com<mailto:dbt...@dbtsai.com>> wrote:

Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but 
very small, and transform doesn't do shuffle. I guess you don't have enough 
partition, so please repartition your input dataset to a number at least larger 
than the # of executors you have.

In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, 
and in that version, we use quasi newton for optimization, so it will be a way 
faster than SGD implementation. Also, in that implementation, StandardScaler is 
not required since in computing the loss function, we implicitly do this for 
you.

https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

Please try this out and give us feedback. Thanks.

On Wednesday, June 3, 2015, Piero Cinquegrana 
<pcinquegr...@marketshare.com<mailto:pcinquegr...@marketshare.com>> wrote:
Hello User group,

I have a RDD of LabeledPoint composed of sparse vectors like showing below. In 
the next step, I am standardizing the columns with the Standard Scaler. The 
data has 2450 columns and ~110M rows. It took 1.5hrs to complete the 
standardization with 10 nodes and 80 executors. The spark.executor.memory was 
set to 2g and the driver memory to 5g.

scala> val parsedData = stack_sorted.mapPartitions( partition =>
                                                        partition.map{row => 
LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, 
InteractionIds, tupleMap, vecLength))
                                                                     }, 
preservesPartitioning=true).cache()

CategoriesIdx: Array[Int] = Array(3, 8, 12)
InteractionIds: Array[(Int, Int)] = Array((13,12))
vecLength: Int = 2450
parsedData: 
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = 
MapPartitionsRDD[93] at mapPartitions at <console>:111
(1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
(0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
(2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
(1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
(0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))


My suspicious is that because the data is partitioned using a custom 
partitioner the Standard Scaler is causing a major shuffle operation. Any 
suggestion on how to improve the performance this step and a 
LinearRegressionWithSGD() which is also taking a very long time?

scala> parsedData.partitioner
res72: Option[org.apache.spark.Partitioner] = 
Some(org.apache.spark.HashPartitioner@d2)

scala> val scaler = new StandardScaler(withMean = false, withStd = 
true).fit(parsedData.map( row =>  row.features))
scala> val scaledData = parsedData.mapPartitions(partition => partition.map{row 
=> LabeledPoint(row.label, scaler.transform(row.features))}).cache()

scala> val numIterations = 100
scala> val stepSize = 0.1
scala> val miniBatchFraction = 0.1
scala> val algorithm = new LinearRegressionWithSGD()

scala> algorithm.setIntercept(false)
scala> algorithm.optimizer.setNumIterations(numIterations)
scala> algorithm.optimizer.setStepSize(stepSize)
scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)

scala> val model = algorithm.run(scaledData)

Best,

Piero Cinquegrana
Marketing Scientist | MarketShare
11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
P: 310.914.5677 x242<tel:310.914.5677%20x242> M: 323.377.9197<tel:323.377.9197>
www.marketshare.com<http://www.marketsharepartners.com/>
twitter.com/marketsharep<http://twitter.com/marketsharep>

Reply via email to