Re: Not getting it

2014-03-28 Thread lannyripple
Ok.  Based on Sonal's message I dived more into memory and partitioning and
got it to work.

For the CSV file I used 1024 partitions [textFile(path, 1024)] which cut
the partition size down to 8MB (based on standard HDFS 64MB splits).  For
the key file I also adjusted partitions to use about 8MB.  This was still
blowing up with GC Overlimit and Heap OOM with join.  I then set SPARK_MEM
(which is hard to tease out of the documentation) to 4g and the join
completed.

Going back to find SPARK_MEM I found this the best explanation --
https://groups.google.com/forum/#!searchin/spark-users/SPARK_MEM/spark-users/ou6cJMlBj_M/NlBHYDjG_NYJ

At a guess setting SPARK_MEM did more than changing the partitions.
 Something to play around.


On Fri, Mar 28, 2014 at 10:17 AM, Lanny Ripple wrote:

> I've played around with it.  The CSV file looks like it gives 130
> partitions.  I'm assuming that's the standard 64MB split size for HDFS
> files.  I have increased number of partitions and number of tasks for
> things like groupByKey and such.  Usually I start blowing up on GC
> Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
> shuffle = true,  into the mix thinking it would bring the keys into the
> same partition. E.g.,
>
> (fileA ++ fileB.map{case (k,v) => (k,
> Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
> shuffle = true).groupBy...
>
> (Which should effectively be imitating map-reduce) but I see GC Overlimit
> when I do that.
>
> I've got a stock install with num cores and worker memory set as mentioned
> but even something like this
>
> fileA.sortByKey().map{_ => 1}.reduce{_ + _}
>
> blows up with GC Overlimit (as did .count instead of the by-hand count).
>
> fileA.count
>
> works.  It seems to be able to load the file as an RDD but not manipulate
> it.
>
>
>
>
> On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List]
>  wrote:
>
>> Have you tried setting the partitioning ?
>>
>> Best Regards,
>> Sonal
>> Nube Technologies 
>>
>> 
>>
>>
>>
>>
>> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden 
>> email]
>> > wrote:
>>
>>> Hi all,
>>>
>>> I've got something which I think should be straightforward but it's not
>>> so
>>> I'm not getting it.
>>>
>>> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have
>>> 16g of
>>> memory using 8 cores.
>>>
>>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
>>> [key,a,b,c...]).
>>> I have another file of 25K lines containing some number of keys which
>>> might
>>> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
>>> something.  I'll get to that but this is toy problem that I'm using to
>>> get
>>> some intuition with spark.)
>>>
>>> Working on each file individually spark has no problem manipulating the
>>> files.  If I try and join or union+filter though I can't seem to find the
>>> join of the two files.  Code is along the lines of
>>>
>>> val fileA =
>>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
>>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>>>
>>> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>>>
>>> (fileA ++ fileB.map{case (k,v) => (k,
>>> Array(v))}).groupBy{_._1}.filter{case
>>> (k, (_, xs)) => xs.exists{_.length == 1}
>>>
>>> just causes spark to freeze.  (In all the cases I'm trying I just use a
>>> final .count to force the results.)
>>>
>>> I suspect I'm missing something fundamental about bringing the keyed data
>>> together into the same partitions so it can be efficiently joined but
>>> I've
>>> given up for now.  If anyone can shed some light (Beyond, "No really.
>>>  Use
>>> shark.") on what I'm not understanding it would be most helpful.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
>>  To unsubscribe from Not getting it, click 
>> here
>> .
>> NAML
>>
>
>




--
View this message in context: 
http://apache-spark-user-

Re: Not getting it

2014-03-28 Thread lannyripple
I've played around with it.  The CSV file looks like it gives 130
partitions.  I'm assuming that's the standard 64MB split size for HDFS
files.  I have increased number of partitions and number of tasks for
things like groupByKey and such.  Usually I start blowing up on GC
Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
shuffle = true,  into the mix thinking it would bring the keys into the
same partition. E.g.,

(fileA ++ fileB.map{case (k,v) => (k,
Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
shuffle = true).groupBy...

(Which should effectively be imitating map-reduce) but I see GC Overlimit
when I do that.

I've got a stock install with num cores and worker memory set as mentioned
but even something like this

fileA.sortByKey().map{_ => 1}.reduce{_ + _}

blows up with GC Overlimit (as did .count instead of the by-hand count).

fileA.count

works.  It seems to be able to load the file as an RDD but not manipulate
it.




On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List] <
ml-node+s1001560n3417...@n3.nabble.com> wrote:

> Have you tried setting the partitioning ?
>
> Best Regards,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden 
> email]
> > wrote:
>
>> Hi all,
>>
>> I've got something which I think should be straightforward but it's not so
>> I'm not getting it.
>>
>> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
>> of
>> memory using 8 cores.
>>
>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
>> [key,a,b,c...]).
>> I have another file of 25K lines containing some number of keys which
>> might
>> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
>> something.  I'll get to that but this is toy problem that I'm using to get
>> some intuition with spark.)
>>
>> Working on each file individually spark has no problem manipulating the
>> files.  If I try and join or union+filter though I can't seem to find the
>> join of the two files.  Code is along the lines of
>>
>> val fileA =
>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>>
>> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>>
>> (fileA ++ fileB.map{case (k,v) => (k,
>> Array(v))}).groupBy{_._1}.filter{case
>> (k, (_, xs)) => xs.exists{_.length == 1}
>>
>> just causes spark to freeze.  (In all the cases I'm trying I just use a
>> final .count to force the results.)
>>
>> I suspect I'm missing something fundamental about bringing the keyed data
>> together into the same partitions so it can be efficiently joined but I've
>> given up for now.  If anyone can shed some light (Beyond, "No really.  Use
>> shark.") on what I'm not understanding it would be most helpful.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
>  To unsubscribe from Not getting it, click 
> here
> .
> NAML
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3437.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Not getting it

2014-03-28 Thread Sonal Goyal
Have you tried setting the partitioning ?

Best Regards,
Sonal
Nube Technologies 






On Thu, Mar 27, 2014 at 10:04 AM, lannyripple wrote:

> Hi all,
>
> I've got something which I think should be straightforward but it's not so
> I'm not getting it.
>
> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
> of
> memory using 8 cores.
>
> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
> [key,a,b,c...]).
> I have another file of 25K lines containing some number of keys which might
> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
> something.  I'll get to that but this is toy problem that I'm using to get
> some intuition with spark.)
>
> Working on each file individually spark has no problem manipulating the
> files.  If I try and join or union+filter though I can't seem to find the
> join of the two files.  Code is along the lines of
>
> val fileA =
> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>
> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>
> (fileA ++ fileB.map{case (k,v) => (k, Array(v))}).groupBy{_._1}.filter{case
> (k, (_, xs)) => xs.exists{_.length == 1}
>
> just causes spark to freeze.  (In all the cases I'm trying I just use a
> final .count to force the results.)
>
> I suspect I'm missing something fundamental about bringing the keyed data
> together into the same partitions so it can be efficiently joined but I've
> given up for now.  If anyone can shed some light (Beyond, "No really.  Use
> shark.") on what I'm not understanding it would be most helpful.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>