I think it would much easier to use DataFrame API to do this by doing
local sort using randn() as key. For example, in Spark 2.0:
val df = spark.range(100)
val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))
Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the
key to group by, then you can get the RDD from shuffled and do the
following operations you want.
Cheng
On 10/20/16 10:53 AM, Yang wrote:
in my application, I group by same training samples by their
model_id's (the input table contains training samples for 100k
different models), then each group ends up having about 1 million
training samples,
then I feed that group of samples to a little Logistic Regression
solver (SGD), but SGD requires the input data to be shuffled randomly
(so that positive and negative samples are evenly distributed), so now
I do something like
my_input_rdd.groupBy(x=>x.model_id).map(x=>
val (model_id, group_of_rows) = x
(model_id, group_of_rows.toSeq().shuffle() )
).map(x=> (x._1, train_sgd(x._2))
the issue is that on the 3rd row above, I had to explicitly call
toSeq() on the group_of_rows in order to shuffle, which is an Iterable
and not Seq. now I have to load the entire 1 million rows into memory,
and in practice I've seen my tasks OOM and GC time goes crazy (about
50% of total run time). I suspect this toSeq() is the reason, since
doing a simple count() on the groupBy() result works fine.
I am planning to shuffle the my_input_rdd first, and then groupBy(),
and not do the toSeq().shuffle(). intuitively the input rdd is already
shuffled, so UNLESS groupBy() tries to do some sorting, the rows in
the group SHOULD remain shuffled ???? but overall this remains rather
flimsy.
any ideas to do this more reliably?
thanks!
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org