Here is a sketch of what you need to do off the top of my head and based on
a guess of what your RDD is like:

val in: RDD[(K,Seq[(C,V)])] = ...

in.flatMap { case (key, colVals) =>
  colVals.map { case (col, val) =>
    (col, (key, val))
  }
}.groupByKey

So the problem with both input and output here is that all values for each
key exist in memory at once. When transposed, each element contains 50M key
value pairs.

You probably should try to do what you're trying to do a slightly different
way.

Depends on what you mean by resubmitting but I imagine you need a cache()
on an RDD you are reusing.
On Dec 26, 2014 4:18 PM, "Michael Albert" <m_albert...@yahoo.com.invalid>
wrote:

> Greetings!
>
> I'm trying to do something similar, and having a very bad time of it.
>
> What I start with is
>
> key1: (col1, val-1-1, col2: val-1-2, col3: val-1-3, col4: val-1-4...)
> key2: (col1: val-2-1, col2: val-2-2, col3: val-2-3, col4: val 2-4, ...)
> ....
>
> What I want  (what I have been asked to produce :-)) is:
>
> col1: (key1: val-1-1, key2: val-2-1, key3, val-3-1, ...)
> col2: (key1: val-1-2, key2: val2-2, key3: val-3-2,...)
>
> So basically the transpose.  The input is actually avro/parquet with each
> "key" in one record.
> In the output, the final step is to convert each column into a "matlab"
> file.
> Please don't ask me whether this is a good idea.
>
> I can get this to work for smallish data sets (e.g, a few hundred keys and
> a few hundred columns).
> However, if I crank up the number of keys to about 5e7, then this fails,
> even if I turn the number of columns that are actually used down to 10.
>
> The system seems to spend lots of time resubmitting parts of the first
> phase
> in which the data is read from the original records and shuffled and never
> quite finishes.
>
> I can't post the code, but I can give folks and idea of what I've tried.
>
> Try #1: Mapper emits data as (DataKey(col-as-int,key-as-int),
> value-as-Option[Any]),
> then create a ShuffledRDD using the col-as-int for partitioning and
> then "SetKeyOrdering" on the key-as-int.  This is then fed to
> "mapPartitionWithIndex".
>
> Try #2: Emit (col-as-int, (key-as-int, value)) and groupBy, and have a
> final "map()" on each "col".
>
> Try #3: Emit (col-as-t, Collection[(key-as-int, value)]), then have a
> reduceByKey
> which takes the "union" of the collection (union for set, ++ for list)
> then have
> a final map() which attempts the final conversion.
>
> No matter what I do, it works for for "small" numbers of keys (hundreds),
> but
> when I crank it up, it seems to sit there resubmitting the shuffle phase.
>
> Happy holidays, all!
> -Mike
>
>
>
>   ------------------------------
>  *From:* Amit Behera <amit.bd...@gmail.com>
> *To:* u...@spark.incubator.apache.org
> *Sent:* Thursday, December 25, 2014 3:22 PM
> *Subject:* unable to do group by with 1st column
>
> Hi Users,
>
> I am reading a csv file and my data format is like :
>
> key1,value1
> key1,value2
> key1,value1
> key1,value3
> key2,value1
> key2,value5
> key2,value5
> key2,value4
> key1,value4
> key1,value4
> key3,value1
> key3,value1
> key3,value2
>
> required output :
>
> key1:[value1,value2,value1,value3,value4,value4]
> key2:[value1,value5,value5,value4]
> key3:[value1,value1,value2]
>
>
> How can I do it? Please help me to do.
>
> Thanks
> Amit
>
>
>

Reply via email to