Can you do count() before fit to force materialize the RDD? I think
something before fit is slow.

On Wednesday, June 3, 2015, Piero Cinquegrana <pcinquegr...@marketshare.com>
wrote:

>  The fit part is very slow, transform not at all.
>
>  The number of partitions was 210 vs number of executors 80.
>
>  Spark 1.4 sounds great but as my company is using Qubole we are
> dependent upon them to upgrade from version 1.3.1. Until that happens, can
> you think of any other reasons as to why it could be slow. Sparse vectors?
> Excessive number of columns?
>
> Sent from my mobile device. Please excuse any typos.
>
> On Jun 3, 2015, at 9:53 PM, DB Tsai <dbt...@dbtsai.com
> <javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');>> wrote:
>
>   Which part of StandardScaler is slow? Fit or transform? Fit has shuffle
> but very small, and transform doesn't do shuffle. I guess you don't have
> enough partition, so please repartition your input dataset to a number at
> least larger than the # of executors you have.
>
>  In Spark 1.4's new ML pipeline api, we have Linear Regression with
> elastic net, and in that version, we use quasi newton for optimization, so
> it will be a way faster than SGD implementation. Also, in that
> implementation, StandardScaler is not required since in computing the loss
> function, we implicitly do this for you.
>
>
> https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef
>
>  Please try this out and give us feedback. Thanks.
>
> On Wednesday, June 3, 2015, Piero Cinquegrana <
> pcinquegr...@marketshare.com
> <javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com');>> wrote:
>
>>  Hello User group,
>>
>>
>>
>> I have a RDD of LabeledPoint composed of sparse vectors like showing
>> below. In the next step, I am standardizing the columns with the Standard
>> Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to
>> complete the standardization with 10 nodes and 80 executors. The
>> spark.executor.memory was set to 2g and the driver memory to 5g.
>>
>>
>>
>> scala> val parsedData = stack_sorted.mapPartitions( partition =>
>>
>>                                                         partition.map{row
>> => LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2,
>> CategoriesIdx, InteractionIds, tupleMap, vecLength))
>>
>>                                                                      },
>> preservesPartitioning=true).cache()
>>
>>
>> CategoriesIdx: Array[Int] = Array(3, 8, 12)
>>
>> InteractionIds: Array[(Int, Int)] = Array((13,12))
>>
>> vecLength: Int = 2450
>>
>> parsedData:
>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
>> MapPartitionsRDD[93] at mapPartitions at <console>:111
>>
>> (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
>>
>> (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
>>
>> (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
>>
>> (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
>>
>> (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))
>>
>>
>>
>>
>>
>> My suspicious is that because the data is partitioned using a custom
>> partitioner the Standard Scaler is causing a major shuffle operation. Any
>> suggestion on how to improve the performance this step and a
>> LinearRegressionWithSGD() which is also taking a very long time?
>>
>>
>>
>> scala> parsedData.partitioner
>>
>> res72: Option[org.apache.spark.Partitioner] = Some(
>> org.apache.spark.HashPartitioner@d2)
>>
>>
>>
>> scala> val scaler = new StandardScaler(withMean = false, withStd =
>> true).fit(parsedData.map( row =>  row.features))
>>
>> scala> val scaledData = parsedData.mapPartitions(partition =>
>> partition.map{row => LabeledPoint(row.label,
>> scaler.transform(row.features))}).cache()
>>
>>
>>
>> scala> val numIterations = 100
>>
>> scala> val stepSize = 0.1
>>
>> scala> val miniBatchFraction = 0.1
>>
>> scala> val algorithm = new LinearRegressionWithSGD()
>>
>>
>>
>> scala> algorithm.setIntercept(false)
>>
>> scala> algorithm.optimizer.setNumIterations(numIterations)
>>
>> scala> algorithm.optimizer.setStepSize(stepSize)
>>
>> scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
>>
>>
>>
>> scala> val model = algorithm.run(scaledData)
>>
>>
>>
>> Best,
>>
>>
>>
>> Piero Cinquegrana
>>
>> Marketing Scientist | MarketShare
>> 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
>> P: 310.914.5677 x242 M: 323.377.9197
>> www.marketshare.com <http://www.marketsharepartners.com/>
>> twitter.com/marketsharep
>>
>>
>>
>

-- 
- DB

Sent from my iPhone

Reply via email to