Hi DB, Yes I am running count() operations on the previous steps and it appears that something is slow prior to the scaler. I thought that running take(5) and print the results would execute the command at each step and materialize the RDD, but is that not the case? That’s how I was testing each step.
Thanks, Piero From: DB Tsai [mailto:dbt...@dbtsai.com] Sent: Wednesday, June 03, 2015 10:33 PM To: Piero Cinquegrana Cc: user@spark.apache.org Subject: Re: Standard Scaler taking 1.5hrs Can you do count() before fit to force materialize the RDD? I think something before fit is slow. On Wednesday, June 3, 2015, Piero Cinquegrana <pcinquegr...@marketshare.com<mailto:pcinquegr...@marketshare.com>> wrote: The fit part is very slow, transform not at all. The number of partitions was 210 vs number of executors 80. Spark 1.4 sounds great but as my company is using Qubole we are dependent upon them to upgrade from version 1.3.1. Until that happens, can you think of any other reasons as to why it could be slow. Sparse vectors? Excessive number of columns? Sent from my mobile device. Please excuse any typos. On Jun 3, 2015, at 9:53 PM, DB Tsai <dbt...@dbtsai.com<javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');>> wrote: Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but very small, and transform doesn't do shuffle. I guess you don't have enough partition, so please repartition your input dataset to a number at least larger than the # of executors you have. In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, and in that version, we use quasi newton for optimization, so it will be a way faster than SGD implementation. Also, in that implementation, StandardScaler is not required since in computing the loss function, we implicitly do this for you. https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef Please try this out and give us feedback. Thanks. On Wednesday, June 3, 2015, Piero Cinquegrana <pcinquegr...@marketshare.com<javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com');>> wrote: Hello User group, I have a RDD of LabeledPoint composed of sparse vectors like showing below. In the next step, I am standardizing the columns with the Standard Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to complete the standardization with 10 nodes and 80 executors. The spark.executor.memory was set to 2g and the driver memory to 5g. scala> val parsedData = stack_sorted.mapPartitions( partition => partition.map{row => LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, InteractionIds, tupleMap, vecLength)) }, preservesPartitioning=true).cache() CategoriesIdx: Array[Int] = Array(3, 8, 12) InteractionIds: Array[(Int, Int)] = Array((13,12)) vecLength: Int = 2450 parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[93] at mapPartitions at <console>:111 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0])) (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0])) (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0])) (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0])) (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0])) My suspicious is that because the data is partitioned using a custom partitioner the Standard Scaler is causing a major shuffle operation. Any suggestion on how to improve the performance this step and a LinearRegressionWithSGD() which is also taking a very long time? scala> parsedData.partitioner res72: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@d2<mailto:org.apache.spark.HashPartitioner@d2>) scala> val scaler = new StandardScaler(withMean = false, withStd = true).fit(parsedData.map( row => row.features)) scala> val scaledData = parsedData.mapPartitions(partition => partition.map{row => LabeledPoint(row.label, scaler.transform(row.features))}).cache() scala> val numIterations = 100 scala> val stepSize = 0.1 scala> val miniBatchFraction = 0.1 scala> val algorithm = new LinearRegressionWithSGD() scala> algorithm.setIntercept(false) scala> algorithm.optimizer.setNumIterations(numIterations) scala> algorithm.optimizer.setStepSize(stepSize) scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction) scala> val model = algorithm.run(scaledData) Best, Piero Cinquegrana Marketing Scientist | MarketShare 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025 P: 310.914.5677 x242<tel:310.914.5677%20x242> M: 323.377.9197<tel:323.377.9197> www.marketshare.com<http://www.marketsharepartners.com/> twitter.com/marketsharep<http://twitter.com/marketsharep> -- - DB Sent from my iPhone