Hi DB,

Yes I am running count() operations on the previous steps and it appears that 
something is slow prior to the scaler. I thought that running take(5) and print 
the results would execute the command at each step and materialize the RDD, but 
is that not the case? That’s how I was testing each step.



From: DB Tsai [mailto:dbt...@dbtsai.com]
Sent: Wednesday, June 03, 2015 10:33 PM
To: Piero Cinquegrana
Cc: user@spark.apache.org
Subject: Re: Standard Scaler taking 1.5hrs

Can you do count() before fit to force materialize the RDD? I think something 
before fit is slow.

On Wednesday, June 3, 2015, Piero Cinquegrana 
<pcinquegr...@marketshare.com<mailto:pcinquegr...@marketshare.com>> wrote:
The fit part is very slow, transform not at all.

The number of partitions was 210 vs number of executors 80.

Spark 1.4 sounds great but as my company is using Qubole we are dependent upon 
them to upgrade from version 1.3.1. Until that happens, can you think of any 
other reasons as to why it could be slow. Sparse vectors? Excessive number of 

Sent from my mobile device. Please excuse any typos.

On Jun 3, 2015, at 9:53 PM, DB Tsai 
<dbt...@dbtsai.com<javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');>> wrote:
Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but 
very small, and transform doesn't do shuffle. I guess you don't have enough 
partition, so please repartition your input dataset to a number at least larger 
than the # of executors you have.

In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, 
and in that version, we use quasi newton for optimization, so it will be a way 
faster than SGD implementation. Also, in that implementation, StandardScaler is 
not required since in computing the loss function, we implicitly do this for 


Please try this out and give us feedback. Thanks.

On Wednesday, June 3, 2015, Piero Cinquegrana 
Hello User group,

I have a RDD of LabeledPoint composed of sparse vectors like showing below. In 
the next step, I am standardizing the columns with the Standard Scaler. The 
data has 2450 columns and ~110M rows. It took 1.5hrs to complete the 
standardization with 10 nodes and 80 executors. The spark.executor.memory was 
set to 2g and the driver memory to 5g.

scala> val parsedData = stack_sorted.mapPartitions( partition =>
                                                        partition.map{row => 
LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, 
InteractionIds, tupleMap, vecLength))

CategoriesIdx: Array[Int] = Array(3, 8, 12)
InteractionIds: Array[(Int, Int)] = Array((13,12))
vecLength: Int = 2450
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = 
MapPartitionsRDD[93] at mapPartitions at <console>:111

My suspicious is that because the data is partitioned using a custom 
partitioner the Standard Scaler is causing a major shuffle operation. Any 
suggestion on how to improve the performance this step and a 
LinearRegressionWithSGD() which is also taking a very long time?

scala> parsedData.partitioner
res72: Option[org.apache.spark.Partitioner] = 

scala> val scaler = new StandardScaler(withMean = false, withStd = 
true).fit(parsedData.map( row =>  row.features))
scala> val scaledData = parsedData.mapPartitions(partition => partition.map{row 
=> LabeledPoint(row.label, scaler.transform(row.features))}).cache()

scala> val numIterations = 100
scala> val stepSize = 0.1
scala> val miniBatchFraction = 0.1
scala> val algorithm = new LinearRegressionWithSGD()

scala> algorithm.setIntercept(false)
scala> algorithm.optimizer.setNumIterations(numIterations)
scala> algorithm.optimizer.setStepSize(stepSize)
scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)

scala> val model = algorithm.run(scaledData)


Piero Cinquegrana
Marketing Scientist | MarketShare
11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
P: 310.914.5677 x242<tel:310.914.5677%20x242> M: 323.377.9197<tel:323.377.9197>

- DB

Sent from my iPhone

Reply via email to