I think it would much easier to use DataFrame API to do this by doing local sort using randn() as key. For example, in Spark 2.0:

val df = spark.range(100)
val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))

Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the key to group by, then you can get the RDD from shuffled and do the following operations you want.

Cheng


On 10/20/16 10:53 AM, Yang wrote:
in my application, I group by same training samples by their model_id's (the input table contains training samples for 100k different models), then each group ends up having about 1 million training samples,

then I feed that group of samples to a little Logistic Regression solver (SGD), but SGD requires the input data to be shuffled randomly (so that positive and negative samples are evenly distributed), so now I do something like

my_input_rdd.groupBy(x=>x.model_id).map(x=>
    val (model_id, group_of_rows) = x

     (model_id, group_of_rows.toSeq().shuffle() )

).map(x=> (x._1, train_sgd(x._2))


the issue is that on the 3rd row above, I had to explicitly call toSeq() on the group_of_rows in order to shuffle, which is an Iterable and not Seq. now I have to load the entire 1 million rows into memory, and in practice I've seen my tasks OOM and GC time goes crazy (about 50% of total run time). I suspect this toSeq() is the reason, since doing a simple count() on the groupBy() result works fine.

I am planning to shuffle the my_input_rdd first, and then groupBy(), and not do the toSeq().shuffle(). intuitively the input rdd is already shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the group SHOULD remain shuffled ???? but overall this remains rather flimsy.

any ideas to do this more reliably?

thanks!



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to