Re: Not getting it
Ok. Based on Sonal's message I dived more into memory and partitioning and got it to work. For the CSV file I used 1024 partitions [textFile(path, 1024)] which cut the partition size down to 8MB (based on standard HDFS 64MB splits). For the key file I also adjusted partitions to use about 8MB. This was still blowing up with GC Overlimit and Heap OOM with join. I then set SPARK_MEM (which is hard to tease out of the documentation) to 4g and the join completed. Going back to find SPARK_MEM I found this the best explanation -- https://groups.google.com/forum/#!searchin/spark-users/SPARK_MEM/spark-users/ou6cJMlBj_M/NlBHYDjG_NYJ At a guess setting SPARK_MEM did more than changing the partitions. Something to play around. On Fri, Mar 28, 2014 at 10:17 AM, Lanny Ripple wrote: > I've played around with it. The CSV file looks like it gives 130 > partitions. I'm assuming that's the standard 64MB split size for HDFS > files. I have increased number of partitions and number of tasks for > things like groupByKey and such. Usually I start blowing up on GC > Overlimit or sometimes Heap OOM. I recently tried throwing coalesce with > shuffle = true, into the mix thinking it would bring the keys into the > same partition. E.g., > > (fileA ++ fileB.map{case (k,v) => (k, > Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length, > shuffle = true).groupBy... > > (Which should effectively be imitating map-reduce) but I see GC Overlimit > when I do that. > > I've got a stock install with num cores and worker memory set as mentioned > but even something like this > > fileA.sortByKey().map{_ => 1}.reduce{_ + _} > > blows up with GC Overlimit (as did .count instead of the by-hand count). > > fileA.count > > works. It seems to be able to load the file as an RDD but not manipulate > it. > > > > > On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List] > wrote: > >> Have you tried setting the partitioning ? >> >> Best Regards, >> Sonal >> Nube Technologies <http://www.nubetech.co> >> >> <http://in.linkedin.com/in/sonalgoyal> >> >> >> >> >> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden >> email]<http://user/SendEmail.jtp?type=node&node=3417&i=0> >> > wrote: >> >>> Hi all, >>> >>> I've got something which I think should be straightforward but it's not >>> so >>> I'm not getting it. >>> >>> I have an 8 node spark 0.9.0 cluster also running HDFS. Workers have >>> 16g of >>> memory using 8 cores. >>> >>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g., >>> [key,a,b,c...]). >>> I have another file of 25K lines containing some number of keys which >>> might >>> be in my CSV file. (Yes, I know I should use an RDBMS or shark or >>> something. I'll get to that but this is toy problem that I'm using to >>> get >>> some intuition with spark.) >>> >>> Working on each file individually spark has no problem manipulating the >>> files. If I try and join or union+filter though I can't seem to find the >>> join of the two files. Code is along the lines of >>> >>> val fileA = >>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)} >>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x} >>> >>> And trying things like fileA.join(fileB) gives me heap OOM. Trying >>> >>> (fileA ++ fileB.map{case (k,v) => (k, >>> Array(v))}).groupBy{_._1}.filter{case >>> (k, (_, xs)) => xs.exists{_.length == 1} >>> >>> just causes spark to freeze. (In all the cases I'm trying I just use a >>> final .count to force the results.) >>> >>> I suspect I'm missing something fundamental about bringing the keyed data >>> together into the same partitions so it can be efficiently joined but >>> I've >>> given up for now. If anyone can shed some light (Beyond, "No really. >>> Use >>> shark.") on what I'm not understanding it would be most helpful. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >> >> >> >> -- >> If you reply to this email, your message
Re: Not getting it
I've played around with it. The CSV file looks like it gives 130 partitions. I'm assuming that's the standard 64MB split size for HDFS files. I have increased number of partitions and number of tasks for things like groupByKey and such. Usually I start blowing up on GC Overlimit or sometimes Heap OOM. I recently tried throwing coalesce with shuffle = true, into the mix thinking it would bring the keys into the same partition. E.g., (fileA ++ fileB.map{case (k,v) => (k, Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length, shuffle = true).groupBy... (Which should effectively be imitating map-reduce) but I see GC Overlimit when I do that. I've got a stock install with num cores and worker memory set as mentioned but even something like this fileA.sortByKey().map{_ => 1}.reduce{_ + _} blows up with GC Overlimit (as did .count instead of the by-hand count). fileA.count works. It seems to be able to load the file as an RDD but not manipulate it. On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List] < ml-node+s1001560n3417...@n3.nabble.com> wrote: > Have you tried setting the partitioning ? > > Best Regards, > Sonal > Nube Technologies <http://www.nubetech.co> > > <http://in.linkedin.com/in/sonalgoyal> > > > > > On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden > email]<http://user/SendEmail.jtp?type=node&node=3417&i=0> > > wrote: > >> Hi all, >> >> I've got something which I think should be straightforward but it's not so >> I'm not getting it. >> >> I have an 8 node spark 0.9.0 cluster also running HDFS. Workers have 16g >> of >> memory using 8 cores. >> >> In HDFS I have a CSV file of 110M lines of 9 columns (e.g., >> [key,a,b,c...]). >> I have another file of 25K lines containing some number of keys which >> might >> be in my CSV file. (Yes, I know I should use an RDBMS or shark or >> something. I'll get to that but this is toy problem that I'm using to get >> some intuition with spark.) >> >> Working on each file individually spark has no problem manipulating the >> files. If I try and join or union+filter though I can't seem to find the >> join of the two files. Code is along the lines of >> >> val fileA = >> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)} >> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x} >> >> And trying things like fileA.join(fileB) gives me heap OOM. Trying >> >> (fileA ++ fileB.map{case (k,v) => (k, >> Array(v))}).groupBy{_._1}.filter{case >> (k, (_, xs)) => xs.exists{_.length == 1} >> >> just causes spark to freeze. (In all the cases I'm trying I just use a >> final .count to force the results.) >> >> I suspect I'm missing something fundamental about bringing the keyed data >> together into the same partitions so it can be efficiently joined but I've >> given up for now. If anyone can shed some light (Beyond, "No really. Use >> shark.") on what I'm not understanding it would be most helpful. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > > > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html > To unsubscribe from Not getting it, click > here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3316&code=bGFubnkucmlwcGxlQGdtYWlsLmNvbXwzMzE2fDExMzI5OTY5Nzc=> > . > NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3437.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Not getting it
Have you tried setting the partitioning ? Best Regards, Sonal Nube Technologies <http://www.nubetech.co> <http://in.linkedin.com/in/sonalgoyal> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple wrote: > Hi all, > > I've got something which I think should be straightforward but it's not so > I'm not getting it. > > I have an 8 node spark 0.9.0 cluster also running HDFS. Workers have 16g > of > memory using 8 cores. > > In HDFS I have a CSV file of 110M lines of 9 columns (e.g., > [key,a,b,c...]). > I have another file of 25K lines containing some number of keys which might > be in my CSV file. (Yes, I know I should use an RDBMS or shark or > something. I'll get to that but this is toy problem that I'm using to get > some intuition with spark.) > > Working on each file individually spark has no problem manipulating the > files. If I try and join or union+filter though I can't seem to find the > join of the two files. Code is along the lines of > > val fileA = > sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)} > val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x} > > And trying things like fileA.join(fileB) gives me heap OOM. Trying > > (fileA ++ fileB.map{case (k,v) => (k, Array(v))}).groupBy{_._1}.filter{case > (k, (_, xs)) => xs.exists{_.length == 1} > > just causes spark to freeze. (In all the cases I'm trying I just use a > final .count to force the results.) > > I suspect I'm missing something fundamental about bringing the keyed data > together into the same partitions so it can be efficiently joined but I've > given up for now. If anyone can shed some light (Beyond, "No really. Use > shark.") on what I'm not understanding it would be most helpful. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
Not getting it
Hi all, I've got something which I think should be straightforward but it's not so I'm not getting it. I have an 8 node spark 0.9.0 cluster also running HDFS. Workers have 16g of memory using 8 cores. In HDFS I have a CSV file of 110M lines of 9 columns (e.g., [key,a,b,c...]). I have another file of 25K lines containing some number of keys which might be in my CSV file. (Yes, I know I should use an RDBMS or shark or something. I'll get to that but this is toy problem that I'm using to get some intuition with spark.) Working on each file individually spark has no problem manipulating the files. If I try and join or union+filter though I can't seem to find the join of the two files. Code is along the lines of val fileA = sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)} val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x} And trying things like fileA.join(fileB) gives me heap OOM. Trying (fileA ++ fileB.map{case (k,v) => (k, Array(v))}).groupBy{_._1}.filter{case (k, (_, xs)) => xs.exists{_.length == 1} just causes spark to freeze. (In all the cases I'm trying I just use a final .count to force the results.) I suspect I'm missing something fundamental about bringing the keyed data together into the same partitions so it can be efficiently joined but I've given up for now. If anyone can shed some light (Beyond, "No really. Use shark.") on what I'm not understanding it would be most helpful. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html Sent from the Apache Spark User List mailing list archive at Nabble.com.