unintended consequence of using coalesce operation

2015-09-29 Thread Lan Jiang
Hi, there

I ran into an issue when using Spark (v 1.3) to load avro file through Spark 
SQL. The code sample is below

val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”)
val myrdd = df.select(“Key", “Name", “binaryfield").rdd
val results = myrdd.map(...)
val finalResults = results.filter(...)
finalResults.coalesce(1).toDF().saveAsParquetFile(“path-to-parquet”)  

The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS 
blocks, which means there should be 5 partitions. Please note that I use 
coalesce because I expect the previous filter transformation should filter out 
almost all the data and I would like to write to 1 single parquet file. 

YARN cluster has 3 datanodes. I use the below configuration for spark submit

spark-submit —class  —num-executors 3 —executor-cores 2 
—executor-memory 8g —master yarn-client mytest.jar 

I do see 3 executors being created, one on each data/worker node. However, 
there is only one task running within the cluster.  After I remove the 
coalesce(1) call from the codes, I can see 5 tasks generates, spreading across 
3 executors.  I was surprised by the result. coalesce usually is thought to be 
a better choice than repartition operation when reducing the partition numbers. 
However, in the case, it causes performance issue because Spark only creates 
one task because the final partition number was coalesced to 1.  Thus there is 
only one thread reading HDFS files instead of 5. 

Is my understanding correct? In this case, I think repartition is a better 
choice than coalesce. 

Lan





Re: unintended consequence of using coalesce operation

2015-09-29 Thread Michael Armbrust
coalesce is generally to avoid launching too many tasks, on a bunch of
small files.  As a result, the goal is to reduce parallelism (when the
overhead of that parallelism is more costly than the gain).  You are
correct that in you case repartition sounds like a better choice.

On Tue, Sep 29, 2015 at 4:33 PM, Lan Jiang  wrote:

> Hi, there
>
> I ran into an issue when using Spark (v 1.3) to load avro file through
> Spark SQL. The code sample is below
>
> val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”)
> val myrdd = df.select(“Key", “Name", “binaryfield").rdd
> val results = myrdd.map(...)
> val finalResults = results.filter(...)
> finalResults.*coalesce(1)*.toDF().saveAsParquetFile(“path-to-parquet”)
>
> The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS
> blocks, which means there should be 5 partitions. Please note that I use
> coalesce because I expect the previous filter transformation should filter
> out almost all the data and I would like to write to 1 single parquet file.
>
> YARN cluster has 3 datanodes. I use the below configuration for spark
> submit
>
> spark-submit —class  —num-executors 3 —executor-cores 2
> —executor-memory 8g —master yarn-client mytest.jar
>
> I do see 3 executors being created, one on each data/worker node. However,
> there is only one task running within the cluster.  After I remove the
> coalesce(1) call from the codes, I can see 5 tasks generates, spreading
> across 3 executors.  I was surprised by the result. coalesce usually is
> thought to be a better choice than repartition operation when reducing the
> partition numbers. However, in the case, it causes performance issue
> because Spark only creates one task because the final partition number was
> coalesced to 1.  Thus there is only one thread reading HDFS files instead
> of 5.
>
> Is my understanding correct? In this case, I think repartition is a better
> choice than coalesce.
>
> Lan
>
>
>
>