Dear Spark developers,
I am trying to benchmark the new Dataframe aggregation implemented under the
project Tungsten and released with Spark 1.4 (I am using the latest Spark from
the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to
allocate memory and in-place update. It was also presented on Spark Summit this
Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true
I wrote a simple code that does aggregation of values by keys. However, the
time needed to execute the code does not depend if the new aggregation is on or
off. Could you suggest how can I observe the improvement that the aggregation
provides? Could you write a code snippet that takes advantage of the new
aggregation?
case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x =>
Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)
Best regards, Alexander