Hi Patrick,

We left the details of the configuration of Spark that we used out of the
blog post for brevity, but we're happy to share them. We've done quite a
bit of tuning to find the configuration settings that gave us the best
query times and run the most queries. I think there might still be a few
improvements that we could make. We spent the majority of our time
optimizing the 20-node (in-memory) case. For documentation purposes, I'm
including a summary of the work we've done so far below. We are also
looking forward to working with the SparkSQL team to look into any further
optimizations.

Initially, we started off with Spark 1.0.2, so we wrote Java programs to
run the queries. We compared SQLContext to HiveContext and found the latter
to be faster and it was recommended to us on the mailing list for having
better SQL support http://goo.gl/IU5Hw0. With Spark 1.1.0, which is the
version that we used to get the benchmark numbers, we just used the
spark-sql command-line tool, which uses HiveContext by default.

We pretty quickly ran into the issue that query times were highly variable
(from minutes to hours). If I understand correctly, Spark's MemoryStore
uses a FIFO caching policy, which means it will remove the oldest block
first rather than the least recently-used one. At the start, the oldest
data is the actual table data which will be reused many times. In the
20-node benchmark, we found that the query times became more stable with
cache, because it pins the table data in memory and also uses the optimized
in-memory format. We did not see any difference between text and parquet
tables in the in-memory case after caching. However, we did not use cache
in the 4-node benchmark, because we saw better query times without it, and
used parquet files generated by inserting into a parquet-backed Hive
metastore table that used ParquetHiveSerDe.

A problem we ran into at the start is that the table data wouldn't fit in
memory when spark.storage.memoryFraction was set to the default value of
0.6, so we increased it to 0.65. We also confirmed that the partitions are
fitting in memory by looking at the storage tab of the Spark Web UI. We
also increased spark.shuffle.memoryFraction from 0.2 to 0.25. This avoided
some premature eviction problems. We ran the benchmark on machines with
122GB memory. We set spark.executor.memory to 110000m to use most of memory
available. Increasing this value gave more stable query times and fewer
failures.

We set spark.serializer to org.apache.spark.serializer.KryoSerializer as
recommended by the Spark docs, and also because it gave us better query
times. We also set spark.sql.inMemoryColumnarStorage.compressed to true.

TPC-H has some very large intermediate jobs and result sets. We found that
a number of timeouts in Spark trigger too early during queries. We
eventually increased spark.worker.timeout, spark.akka.timeout, and
spark.storage.blockManagerSlaveTimeoutMs
to 10 minutes to avoid these issues.

We also tried different block sizes. Some queries run slight faster, but
others a lot slower when using a bigger block size (e.g., 512MB), but
eventually found 128MB gave us most stable and overall lowest query times.
We also experimented a bit with spark.sql.shuffle.partitions. We eventually
set it to the number of vCPUs in the cluster, which was 320. I should note
that one thing that we've noticed in all of our benchmarks was that when
running TPC-H on EC2, there is not that much benefit of using 16 vCPUs over
8. This is because the r3.4xlarge and i2.4xlarge machines have 8 physical
cores, which each have hyper-threading to give 16 vCPUs, but the benefit of
hyper-threading isn't huge in this case, meaning that the cores are (in the
worst case) only half as fast when using all 16.

There are a few more settings we experimented with further with mixed, but
overall not hugely significant results. We tried
increasing spark.shuffle.file.buffer.kb, spark.akka.framesize. We increased
spark.akka.threads from 4 to 8.

We tried compute analyze <table name> compute statistics, but that failed
with the following errors:

java.lang.Throwable: Child Error
        at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
Caused by: java.io.IOException: Task process exit with nonzero status of 1.
        at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

We also tried running compute analyze <table name> compute statistics
noscan. However, when using cache <table name>, it failed with the
following error:

14/11/03 15:59:17 ERROR CliDriver: scala.NotImplementedError: Analyze has
only implemented for Hive tables, but customer is a Subquery
        at
org.apache.spark.sql.hive.HiveContext.analyze(HiveContext.scala:189)

Without cache, the noscan command completes successfully. However, we have
not seen any performance benefit from and still see queries like Q5 failing
(after a very long time). We found using cache to be preferable over using
analyze in the in-memory case.

Besides these, we also experimented with several different settings but
didn't find them to have a particular impact on query times. We are now
looking forward to working together with SparkSQL developers, and
re-running the numbers with proposed optimizations.

regards,
Marco

Reply via email to