take(5) will only evaluate enough partitions to provide 5 elements
(sometimes a few more but you get the idea), so it won't trigger a full
evaluation of all partitions unlike count().

On Thursday, June 4, 2015, Piero Cinquegrana <pcinquegr...@marketshare.com>
wrote:

>  Hi DB,
>
>
>
> Yes I am running count() operations on the previous steps and it appears
> that something is slow prior to the scaler. I thought that running take(5)
> and print the results would execute the command at each step and
> materialize the RDD, but is that not the case? That’s how I was testing
> each step.
>
>
>
> Thanks,
>
>
>
> Piero
>
>
>
> *From:* DB Tsai [mailto:dbt...@dbtsai.com
> <javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com');>]
> *Sent:* Wednesday, June 03, 2015 10:33 PM
> *To:* Piero Cinquegrana
> *Cc:* user@spark.apache.org
> <javascript:_e(%7B%7D,'cvml','user@spark.apache.org');>
> *Subject:* Re: Standard Scaler taking 1.5hrs
>
>
>
> Can you do count() before fit to force materialize the RDD? I think
> something before fit is slow.
>
> On Wednesday, June 3, 2015, Piero Cinquegrana <
> pcinquegr...@marketshare.com
> <javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com');>> wrote:
>
> The fit part is very slow, transform not at all.
>
>
>
> The number of partitions was 210 vs number of executors 80.
>
>
>
> Spark 1.4 sounds great but as my company is using Qubole we are dependent
> upon them to upgrade from version 1.3.1. Until that happens, can you think
> of any other reasons as to why it could be slow. Sparse vectors? Excessive
> number of columns?
>
>
> Sent from my mobile device. Please excuse any typos.
>
>
> On Jun 3, 2015, at 9:53 PM, DB Tsai <dbt...@dbtsai.com> wrote:
>
>  Which part of StandardScaler is slow? Fit or transform? Fit has shuffle
> but very small, and transform doesn't do shuffle. I guess you don't have
> enough partition, so please repartition your input dataset to a number at
> least larger than the # of executors you have.
>
>
>
> In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic
> net, and in that version, we use quasi newton for optimization, so it will
> be a way faster than SGD implementation. Also, in that
> implementation, StandardScaler is not required since in computing the loss
> function, we implicitly do this for you.
>
>
>
>
> https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef
>
>
>
> Please try this out and give us feedback. Thanks.
>
> On Wednesday, June 3, 2015, Piero Cinquegrana <
> pcinquegr...@marketshare.com> wrote:
>
> Hello User group,
>
>
>
> I have a RDD of LabeledPoint composed of sparse vectors like showing
> below. In the next step, I am standardizing the columns with the Standard
> Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to
> complete the standardization with 10 nodes and 80 executors. The
> spark.executor.memory was set to 2g and the driver memory to 5g.
>
>
>
> scala> val parsedData = stack_sorted.mapPartitions( partition =>
>
>                                                         partition.map{row
> => LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2,
> CategoriesIdx, InteractionIds, tupleMap, vecLength))
>
>                                                                      },
> preservesPartitioning=true).cache()
>
>
> CategoriesIdx: Array[Int] = Array(3, 8, 12)
>
> InteractionIds: Array[(Int, Int)] = Array((13,12))
>
> vecLength: Int = 2450
>
> parsedData:
> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
> MapPartitionsRDD[93] at mapPartitions at <console>:111
>
> (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
>
> (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
>
> (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
>
> (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
>
> (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))
>
>
>
>
>
> My suspicious is that because the data is partitioned using a custom
> partitioner the Standard Scaler is causing a major shuffle operation. Any
> suggestion on how to improve the performance this step and a
> LinearRegressionWithSGD() which is also taking a very long time?
>
>
>
> scala> parsedData.partitioner
>
> res72: Option[org.apache.spark.Partitioner] = Some(
> org.apache.spark.HashPartitioner@d2
> <javascript:_e(%7B%7D,'cvml','org.apache.spark.HashPartitioner@d2');>)
>
>
>
> scala> val scaler = new StandardScaler(withMean = false, withStd =
> true).fit(parsedData.map( row =>  row.features))
>
> scala> val scaledData = parsedData.mapPartitions(partition =>
> partition.map{row => LabeledPoint(row.label,
> scaler.transform(row.features))}).cache()
>
>
>
> scala> val numIterations = 100
>
> scala> val stepSize = 0.1
>
> scala> val miniBatchFraction = 0.1
>
> scala> val algorithm = new LinearRegressionWithSGD()
>
>
>
> scala> algorithm.setIntercept(false)
>
> scala> algorithm.optimizer.setNumIterations(numIterations)
>
> scala> algorithm.optimizer.setStepSize(stepSize)
>
> scala> algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
>
>
>
> scala> val model = algorithm.run(scaledData)
>
>
>
> Best,
>
>
>
> Piero Cinquegrana
>
> Marketing Scientist | MarketShare
> 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
> P: 310.914.5677 x242 M: 323.377.9197
> www.marketshare.com <http://www.marketsharepartners.com/>
> twitter.com/marketsharep
>
>
>
>
>
> --
> - DB
>
> Sent from my iPhone
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau

Reply via email to