Hi Patrick, We left the details of the configuration of Spark that we used out of the blog post for brevity, but we're happy to share them. We've done quite a bit of tuning to find the configuration settings that gave us the best query times and run the most queries. I think there might still be a few improvements that we could make. We spent the majority of our time optimizing the 20-node (in-memory) case. For documentation purposes, I'm including a summary of the work we've done so far below. We are also looking forward to working with the SparkSQL team to look into any further optimizations.
Initially, we started off with Spark 1.0.2, so we wrote Java programs to run the queries. We compared SQLContext to HiveContext and found the latter to be faster and it was recommended to us on the mailing list for having better SQL support http://goo.gl/IU5Hw0. With Spark 1.1.0, which is the version that we used to get the benchmark numbers, we just used the spark-sql command-line tool, which uses HiveContext by default. We pretty quickly ran into the issue that query times were highly variable (from minutes to hours). If I understand correctly, Spark's MemoryStore uses a FIFO caching policy, which means it will remove the oldest block first rather than the least recently-used one. At the start, the oldest data is the actual table data which will be reused many times. In the 20-node benchmark, we found that the query times became more stable with cache, because it pins the table data in memory and also uses the optimized in-memory format. We did not see any difference between text and parquet tables in the in-memory case after caching. However, we did not use cache in the 4-node benchmark, because we saw better query times without it, and used parquet files generated by inserting into a parquet-backed Hive metastore table that used ParquetHiveSerDe. A problem we ran into at the start is that the table data wouldn't fit in memory when spark.storage.memoryFraction was set to the default value of 0.6, so we increased it to 0.65. We also confirmed that the partitions are fitting in memory by looking at the storage tab of the Spark Web UI. We also increased spark.shuffle.memoryFraction from 0.2 to 0.25. This avoided some premature eviction problems. We ran the benchmark on machines with 122GB memory. We set spark.executor.memory to 110000m to use most of memory available. Increasing this value gave more stable query times and fewer failures. We set spark.serializer to org.apache.spark.serializer.KryoSerializer as recommended by the Spark docs, and also because it gave us better query times. We also set spark.sql.inMemoryColumnarStorage.compressed to true. TPC-H has some very large intermediate jobs and result sets. We found that a number of timeouts in Spark trigger too early during queries. We eventually increased spark.worker.timeout, spark.akka.timeout, and spark.storage.blockManagerSlaveTimeoutMs to 10 minutes to avoid these issues. We also tried different block sizes. Some queries run slight faster, but others a lot slower when using a bigger block size (e.g., 512MB), but eventually found 128MB gave us most stable and overall lowest query times. We also experimented a bit with spark.sql.shuffle.partitions. We eventually set it to the number of vCPUs in the cluster, which was 320. I should note that one thing that we've noticed in all of our benchmarks was that when running TPC-H on EC2, there is not that much benefit of using 16 vCPUs over 8. This is because the r3.4xlarge and i2.4xlarge machines have 8 physical cores, which each have hyper-threading to give 16 vCPUs, but the benefit of hyper-threading isn't huge in this case, meaning that the cores are (in the worst case) only half as fast when using all 16. There are a few more settings we experimented with further with mixed, but overall not hugely significant results. We tried increasing spark.shuffle.file.buffer.kb, spark.akka.framesize. We increased spark.akka.threads from 4 to 8. We tried compute analyze <table name> compute statistics, but that failed with the following errors: java.lang.Throwable: Child Error at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271) Caused by: java.io.IOException: Task process exit with nonzero status of 1. at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258) We also tried running compute analyze <table name> compute statistics noscan. However, when using cache <table name>, it failed with the following error: 14/11/03 15:59:17 ERROR CliDriver: scala.NotImplementedError: Analyze has only implemented for Hive tables, but customer is a Subquery at org.apache.spark.sql.hive.HiveContext.analyze(HiveContext.scala:189) Without cache, the noscan command completes successfully. However, we have not seen any performance benefit from and still see queries like Q5 failing (after a very long time). We found using cache to be preferable over using analyze in the in-memory case. Besides these, we also experimented with several different settings but didn't find them to have a particular impact on query times. We are now looking forward to working together with SparkSQL developers, and re-running the numbers with proposed optimizations. regards, Marco