Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?
Hi Richard, thanks for the response. My use case is weird I need to process data row by row for one partition and update required rows. Updated rows percentage would be 30%. As per above stackoverflow.com answer suggestions I refactored code to use mappartitionswithindex JavaRDD indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2, Iterator>() { @Override public Iterator call(Integer ind, Iterator rowIterator) throws Exception { List rowList = new ArrayList<>(); while (rowIterator.hasNext()) { Row row = rowIterator.next(); List rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow = RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return rowList.iterator(); } }, true). union(remainingrdd).coalesce(200,true); Above code still hits memory limits as I have 2 tb data to process and above resulted rdd I use to create DataFrame which again I use it to register as temp table using hiveContext and execute few insert into partitions query using hiveContext.sql Please help me optimize above code. On Sep 9, 2015 2:55 AM, "Richard Marscher" wrote: > Hi, > > what is the reasoning behind the use of `coalesce(1,false)`? This is > saying to aggregate all data into a single partition, which must fit in > memory on one node in the Spark cluster. If the cluster has more than one > node it must shuffle to move the data. It doesn't seem like the following > map or union necessitate coalesce, but the use case is not clear to me. > > On Fri, Sep 4, 2015 at 12:29 PM, unk1102 wrote: > >> Hi I have Spark job which does some processing on ORC data and stores back >> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I >> have >> the following piece of code which is using heavy shuffle memory. How do I >> optimize below code? Is there anything wrong with it? It is working fine >> as >> expected only causing slowness because of GC pause and shuffles lots of >> data >> so hitting memory issues. Please guide I am new to Spark. Thanks in >> advance. >> >> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, >> false).map(new Function() { >>@Override >>public Row call(Row row) throws Exception { >> List rowAsList; >> Row row1 = null; >> if (row != null) { >> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); >> row1 = RowFactory.create(rowAsList.toArray()); >> } >> return row1; >>} >> }).union(modifiedRDD); >> DataFrame updatedDataFrame = >> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema()); >> >> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", >> "date").save("baseTable"); >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > *Richard Marscher* > Software Engineer > Localytics > Localytics.com <http://localytics.com/> | Our Blog > <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | > Facebook <http://facebook.com/localytics> | LinkedIn > <http://www.linkedin.com/company/1148792?trk=tyah> >
Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?
Hi, what is the reasoning behind the use of `coalesce(1,false)`? This is saying to aggregate all data into a single partition, which must fit in memory on one node in the Spark cluster. If the cluster has more than one node it must shuffle to move the data. It doesn't seem like the following map or union necessitate coalesce, but the use case is not clear to me. On Fri, Sep 4, 2015 at 12:29 PM, unk1102 wrote: > Hi I have Spark job which does some processing on ORC data and stores back > ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have > the following piece of code which is using heavy shuffle memory. How do I > optimize below code? Is there anything wrong with it? It is working fine as > expected only causing slowness because of GC pause and shuffles lots of > data > so hitting memory issues. Please guide I am new to Spark. Thanks in > advance. > > JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, > false).map(new Function() { >@Override >public Row call(Row row) throws Exception { > List rowAsList; > Row row1 = null; > if (row != null) { > rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); > row1 = RowFactory.create(rowAsList.toArray()); > } > return row1; >} > }).union(modifiedRDD); > DataFrame updatedDataFrame = > hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema()); > > updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", > "date").save("baseTable"); > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>
Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?
Hi I have Spark job which does some processing on ORC data and stores back ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have the following piece of code which is using heavy shuffle memory. How do I optimize below code? Is there anything wrong with it? It is working fine as expected only causing slowness because of GC pause and shuffles lots of data so hitting memory issues. Please guide I am new to Spark. Thanks in advance. JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, false).map(new Function() { @Override public Row call(Row row) throws Exception { List rowAsList; Row row1 = null; if (row != null) { rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq())); row1 = RowFactory.create(rowAsList.toArray()); } return row1; } }).union(modifiedRDD); DataFrame updatedDataFrame = hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema()); updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", "date").save("baseTable"); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org