Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-09 Thread Umesh Kacha
Hi Richard, thanks for the response. My use case is weird I need to process
data row by row for one partition and update required rows. Updated rows
percentage would be 30%. As per above stackoverflow.com answer suggestions
I refactored code to use mappartitionswithindex

JavaRDD indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new
Function2() { @Override public
Iterator call(Integer ind, Iterator rowIterator) throws Exception
{ List rowList = new ArrayList<>(); while (rowIterator.hasNext()) {
Row row = rowIterator.next(); List rowAsList =
iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow =
RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return
rowList.iterator(); } }, true). union(remainingrdd).coalesce(200,true);

Above code still hits memory limits as I have 2 tb data to process and
above resulted rdd I use to create DataFrame which again I use it to
register as temp table using hiveContext and execute few insert into
partitions query using hiveContext.sql

Please help me optimize above code.
On Sep 9, 2015 2:55 AM, "Richard Marscher"  wrote:

> Hi,
>
> what is the reasoning behind the use of `coalesce(1,false)`? This is
> saying to aggregate all data into a single partition, which must fit in
> memory on one node in the Spark cluster. If the cluster has more than one
> node it must shuffle to move the data. It doesn't seem like the following
> map or union necessitate coalesce, but the use case is not clear to me.
>
> On Fri, Sep 4, 2015 at 12:29 PM, unk1102  wrote:
>
>> Hi I have Spark job which does some processing on ORC data and stores back
>> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I
>> have
>> the following piece of code which is using heavy shuffle memory. How do I
>> optimize below code? Is there anything wrong with it? It is working fine
>> as
>> expected only causing slowness because of GC pause and shuffles lots of
>> data
>> so hitting memory issues. Please guide I am new to Spark. Thanks in
>> advance.
>>
>> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
>> false).map(new Function() {
>>@Override
>>public Row call(Row row) throws Exception {
>> List rowAsList;
>> Row row1 = null;
>> if (row != null) {
>>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>>   row1 = RowFactory.create(rowAsList.toArray());
>> }
>> return row1;
>>}
>> }).union(modifiedRDD);
>> DataFrame updatedDataFrame =
>> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>>
>> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
>> "date").save("baseTable");
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>


Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-08 Thread Richard Marscher
Hi,

what is the reasoning behind the use of `coalesce(1,false)`? This is saying
to aggregate all data into a single partition, which must fit in memory on
one node in the Spark cluster. If the cluster has more than one node it
must shuffle to move the data. It doesn't seem like the following map or
union necessitate coalesce, but the use case is not clear to me.

On Fri, Sep 4, 2015 at 12:29 PM, unk1102  wrote:

> Hi I have Spark job which does some processing on ORC data and stores back
> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have
> the following piece of code which is using heavy shuffle memory. How do I
> optimize below code? Is there anything wrong with it? It is working fine as
> expected only causing slowness because of GC pause and shuffles lots of
> data
> so hitting memory issues. Please guide I am new to Spark. Thanks in
> advance.
>
> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
> false).map(new Function() {
>@Override
>public Row call(Row row) throws Exception {
> List rowAsList;
> Row row1 = null;
> if (row != null) {
>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>   row1 = RowFactory.create(rowAsList.toArray());
> }
> return row1;
>}
> }).union(modifiedRDD);
> DataFrame updatedDataFrame =
> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>
> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
> "date").save("baseTable");
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-04 Thread unk1102
Hi I have Spark job which does some processing on ORC data and stores back
ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have
the following piece of code which is using heavy shuffle memory. How do I
optimize below code? Is there anything wrong with it? It is working fine as
expected only causing slowness because of GC pause and shuffles lots of data
so hitting memory issues. Please guide I am new to Spark. Thanks in advance.

JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
false).map(new Function() {
   @Override
   public Row call(Row row) throws Exception {
List rowAsList;
Row row1 = null;
if (row != null) {
  rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
  row1 = RowFactory.create(rowAsList.toArray());
}
return row1;
   }
}).union(modifiedRDD);
DataFrame updatedDataFrame =
hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
"date").save("baseTable");



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org