How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
in 1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate,
rather than using the in-memory cache, which is super slow and we are
thinking of removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x =>
(util.Random.nextInt(size / repetitions),
util.Random.nextDouble)).toDF("key", "value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)



On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

> Dear Spark developers,
>
>
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
>
> https://github.com/apache/spark/pull/5725
>
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>
> The following enables the new aggregation in spark-config:
>
> spark.sql.unsafe.enabled=true
>
> spark.unsafe.offHeap=true
>
>
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
>
>
> case class Counter(key: Int, value: Double)
>
> val size = 100000000
>
> val partitions = 5
>
> val repetitions = 5
>
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>
> val df = sqlContext.createDataFrame(data)
>
> df.persist()
>
> df.count()
>
> val t = System.nanoTime()
>
> val res = df.groupBy("key").agg(sum("value"))
>
> res.count()
>
> println((System.nanoTime() - t) / 1e9)
>
>
>
>
>
> Best regards, Alexander
>

Reply via email to