I think you might need to turn codegen on also in order for the unsafe
stuff to work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <r...@databricks.com<mailto:
> r...@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com<mailto:
> r...@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>

Reply via email to