Hi Jestin, Spark is smart about how it does joins. In this case, if df2 is sufficiently small it will do a broadcast join. Basically, rather than shuffle df1/df2 for a join, it broadcasts df2 to all workers and joins locally. Looks like you may already have known that though based on using the spark.sql.autoBroadcastJoinThreshold.
Its hard to say why your job is slow without knowing more. For example, it could be a CPU intensive calculation or maybe you have imbalance over keys which would cause a straggler. Hard to know without knowing what some of the metrics from the Spark UI are like. 1. If you aren’t tied down by legacy code, Spark 2.0 has a nicer Dataset API and more improvements so I don’t see why not. Spark 2.0 RC5 vote passed last night so the official release will probably go out early next week 2. RDDs will make it worse. In the case of reduceByKey/groupByKey this is specific to RDDs, the DataFrame API doesn’t mirror that. You hear that because reduceByKey will run reduce locally at each node for each key, then reduce all those results to get the final result. groupByKey will shuffle all keys across the network which if you are just doing a reduce right after is wasteful. DataFrame’s have lots of optimizations as well 3. Shouldn’t need to explicitly call broadcast 4. Driver memory is important if your node needs to collect results back to it for some reason. One good example is in mllib/ml its common to collect parameters back to the driver to update a global model. For some algorithms (like LDA), the model can be quite large so it requires high driver memory. 5. Hard to know without more metrics from your job. That being said, your number of executor instances vs number of cores seems a bit high. I would try 5 instances of 15 cores each or 10 of 7 cores each. You can also kick up the memory to use more of your cluster’s memory. Lastly, if you are running on EC2 make sure to configure spark.local.dir to write to something that is not an EBS volume, preferably an attached SSD to something like an r3 machine. — Pedro Rodriguez PhD Student in Large-Scale Machine Learning | CU Boulder Systems Oriented Data Scientist UC Berkeley AMPLab Alumni pedrorodriguez.io | 909-353-4423 github.com/EntilZha | LinkedIn On July 23, 2016 at 9:31:21 AM, Jestin Ma (jestinwith.a...@gmail.com) wrote: Hello, Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one DataFrame and join with another, df2. The first, df1, is very large (many gigabytes) compared to df2 (250 Mb). Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB RAM each. It is taking me about 1 hour and 40 minutes to perform the groupBy, count, and join, which seems very slow to me. Currently I have set the following in my spark-defaults.conf: spark.executor.instances 24 spark.executor.memory 10g spark.executor.cores 3 spark.driver.memory 5g spark.sql.autoBroadcastJoinThreshold 200Mb I have a couple of questions regarding tuning for performance as a beginner. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet (or even DataFrames) be better? What if I used RDDs instead? I know that reduceByKey is better than groupByKey, and DataFrames don't have that method. I think I can do a broadcast join and have set a threshold. Do I need to set it above my second DataFrame size? Do I need to explicitly call broadcast(df2)? What's the point of driver memory? Can anyone point out something wrong with my tuning numbers, or any additional parameters worth checking out? Thank you a lot! Sincerely, Jestin