take(5) will only evaluate enough partitions to provide 5 elements (sometimes a few more but you get the idea), so it won't trigger a full evaluation of all partitions unlike count().
On Thursday, June 4, 2015, Piero Cinquegrana <pcinquegr...@marketshare.com> wrote: > Hi DB, > > > > Yes I am running count() operations on the previous steps and it appears > that something is slow prior to the scaler. I thought that running take(5) > and print the results would execute the command at each step and > materialize the RDD, but is that not the case? That’s how I was testing > each step. > > > > Thanks, > > > > Piero > > > > *From:* DB Tsai [mailto:dbt...@dbtsai.com > <javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');>] > *Sent:* Wednesday, June 03, 2015 10:33 PM > *To:* Piero Cinquegrana > *Cc:* user@spark.apache.org > <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');> > *Subject:* Re: Standard Scaler taking 1.5hrs > > > > Can you do count() before fit to force materialize the RDD? I think > something before fit is slow. > > On Wednesday, June 3, 2015, Piero Cinquegrana < > pcinquegr...@marketshare.com > <javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com');>> wrote: > > The fit part is very slow, transform not at all. > > > > The number of partitions was 210 vs number of executors 80. > > > > Spark 1.4 sounds great but as my company is using Qubole we are dependent > upon them to upgrade from version 1.3.1. Until that happens, can you think > of any other reasons as to why it could be slow. Sparse vectors? Excessive > number of columns? > > > Sent from my mobile device. Please excuse any typos. > > > On Jun 3, 2015, at 9:53 PM, DB Tsai <dbt...@dbtsai.com> wrote: > > Which part of StandardScaler is slow? Fit or transform? Fit has shuffle > but very small, and transform doesn't do shuffle. I guess you don't have > enough partition, so please repartition your input dataset to a number at > least larger than the # of executors you have. > > > > In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic > net, and in that version, we use quasi newton for optimization, so it will > be a way faster than SGD implementation. Also, in that > implementation, StandardScaler is not required since in computing the loss > function, we implicitly do this for you. > > > > > https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef > > > > Please try this out and give us feedback. Thanks. > > On Wednesday, June 3, 2015, Piero Cinquegrana < > pcinquegr...@marketshare.com> wrote: > > Hello User group, > > > > I have a RDD of LabeledPoint composed of sparse vectors like showing > below. In the next step, I am standardizing the columns with the Standard > Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to > complete the standardization with 10 nodes and 80 executors. The > spark.executor.memory was set to 2g and the driver memory to 5g. > > > > scala> val parsedData = stack_sorted.mapPartitions( partition => > > partition.map{row > => LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, > CategoriesIdx, InteractionIds, tupleMap, vecLength)) > > }, > preservesPartitioning=true).cache() > > > CategoriesIdx: Array[Int] = Array(3, 8, 12) > > InteractionIds: Array[(Int, Int)] = Array((13,12)) > > vecLength: Int = 2450 > > parsedData: > org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = > MapPartitionsRDD[93] at mapPartitions at <console>:111 > > (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) > > (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) > > (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) > > (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) > > (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) > > > > > > My suspicious is that because the data is partitioned using a custom > partitioner the Standard Scaler is causing a major shuffle operation. Any > suggestion on how to improve the performance this step and a > LinearRegressionWithSGD() which is also taking a very long time? > > > > scala> parsedData.partitioner > > res72: Option[org.apache.spark.Partitioner] = Some( > org.apache.spark.HashPartitioner@d2 > <javascript:_e(%7B%7D,'cvml','org.apache.spark.HashPartitioner@d2');>) > > > > scala> val scaler = new StandardScaler(withMean = false, withStd = > true).fit(parsedData.map( row => row.features)) > > scala> val scaledData = parsedData.mapPartitions(partition => > partition.map{row => LabeledPoint(row.label, > scaler.transform(row.features))}).cache() > > > > scala> val numIterations = 100 > > scala> val stepSize = 0.1 > > scala> val miniBatchFraction = 0.1 > > scala> val algorithm = new LinearRegressionWithSGD() > > > > scala> algorithm.setIntercept(false) > > scala> algorithm.optimizer.setNumIterations(numIterations) > > scala> algorithm.optimizer.setStepSize(stepSize) > > scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) > > > > scala> val model = algorithm.run(scaledData) > > > > Best, > > > > Piero Cinquegrana > > Marketing Scientist | MarketShare > 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 > P: 310.914.5677 x242 M: 323.377.9197 > www.marketshare.com <http://www.marketsharepartners.com/> > twitter.com/marketsharep > > > > > > -- > - DB > > Sent from my iPhone > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau