Hi Jestin,

Spark is smart about how it does joins. In this case, if df2 is sufficiently 
small it will do a broadcast join. Basically, rather than shuffle df1/df2 for a 
join, it broadcasts df2 to all workers and joins locally. Looks like you may 
already have known that though based on using the 
spark.sql.autoBroadcastJoinThreshold.

Its hard to say why your job is slow without knowing more. For example, it 
could be a CPU intensive calculation or maybe you have imbalance over keys 
which would cause a straggler. Hard to know without knowing what some of the 
metrics from the Spark UI are like.

1. If you aren’t tied down by legacy code, Spark 2.0 has a nicer Dataset API 
and more improvements so I don’t see why not. Spark 2.0 RC5 vote passed last 
night so the official release will probably go out early next week
2. RDDs will make it worse. In the case of reduceByKey/groupByKey this is 
specific to RDDs, the DataFrame API doesn’t mirror that. You hear that because 
reduceByKey will run reduce locally at each node for each key, then reduce all 
those results to get the final result. groupByKey will shuffle all keys across 
the network which if you are just doing a reduce right after is wasteful. 
DataFrame’s have lots of optimizations as well
3. Shouldn’t need to explicitly call broadcast
4. Driver memory is important if your node needs to collect results back to it 
for some reason. One good example is in mllib/ml its common to collect 
parameters back to the driver to update a global model. For some algorithms 
(like LDA), the model can be quite large so it requires high driver memory.
5. Hard to know without more metrics from your job. That being said, your 
number of executor instances vs number of cores seems a bit high. I would try 5 
instances of 15 cores each or 10 of 7 cores each. You can also kick up the 
memory to use more of your cluster’s memory. Lastly, if you are running on EC2 
make sure to configure spark.local.dir to write to something that is not an EBS 
volume, preferably an attached SSD to something like an r3 machine.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 23, 2016 at 9:31:21 AM, Jestin Ma (jestinwith.a...@gmail.com) wrote:

Hello,
Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one 
DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM 
each.
It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and 
join, which seems very slow to me.

Currently I have set the following in my spark-defaults.conf:

spark.executor.instances                             24
spark.executor.memory                               10g
spark.executor.cores                                    3
spark.driver.memory                                     5g
spark.sql.autoBroadcastJoinThreshold        200Mb


I have a couple of questions regarding tuning for performance as a beginner.
Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even 
DataFrames) be better?
What if I used RDDs instead? I know that reduceByKey is better than groupByKey, 
and DataFrames don't have that method. 
I think I can do a broadcast join and have set a threshold. Do I need to set it 
above my second DataFrame size? Do I need to explicitly call broadcast(df2)?
What's the point of driver memory?
Can anyone point out something wrong with my tuning numbers, or any additional 
parameters worth checking out?

Thank you a lot!
Sincerely,
Jestin

Reply via email to