BTW one other thing -- don't use the count() to do benchmark, since the
optimizer is smart enough to figure out that you don't actually need to run
the sum.


For the purpose of benchmarking, you can use

df.foreach(i => do nothing)




On Thu, Aug 20, 2015 at 3:31 PM, Reynold Xin <r...@databricks.com> wrote:

>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> How did you run this? I couldn't run your query with 4G of RAM in 1.4,
>> but in 1.5 it ran.
>>
>> Also I recommend just dumping the data to parquet on disk to evaluate,
>> rather than using the in-memory cache, which is super slow and we are
>> thinking of removing/replacing with something else.
>>
>>
>> val size = 100000000
>> val partitions = 10
>> val repetitions = 5
>> val data = sc.parallelize(1 to size, partitions).map(x =>
>> (util.Random.nextInt(size / repetitions),
>> util.Random.nextDouble)).toDF("key", "value")
>>
>> data.write.parquet("/scratch/rxin/tmp/alex")
>>
>>
>> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
>> val t = System.nanoTime()
>> val res = df.groupBy("key").agg(sum("value"))
>> res.count()
>> println((System.nanoTime() - t) / 1e9)
>>
>>
>>
>> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
>> alexander.ula...@hp.com> wrote:
>>
>>> Dear Spark developers,
>>>
>>>
>>>
>>> I am trying to benchmark the new Dataframe aggregation implemented under
>>> the project Tungsten and released with Spark 1.4 (I am using the latest
>>> Spark from the repo, i.e. 1.5):
>>>
>>> https://github.com/apache/spark/pull/5725
>>>
>>> It tells that the aggregation should be faster due to using the unsafe
>>> to allocate memory and in-place update. It was also presented on Spark
>>> Summit this Summer:
>>>
>>>
>>> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>>>
>>> The following enables the new aggregation in spark-config:
>>>
>>> spark.sql.unsafe.enabled=true
>>>
>>> spark.unsafe.offHeap=true
>>>
>>>
>>>
>>> I wrote a simple code that does aggregation of values by keys. However,
>>> the time needed to execute the code does not depend if the new aggregation
>>> is on or off. Could you suggest how can I observe the improvement that the
>>> aggregation provides? Could you write a code snippet that takes advantage
>>> of the new aggregation?
>>>
>>>
>>>
>>> case class Counter(key: Int, value: Double)
>>>
>>> val size = 100000000
>>>
>>> val partitions = 5
>>>
>>> val repetitions = 5
>>>
>>> val data = sc.parallelize(1 to size, partitions).map(x =>
>>> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>>>
>>> val df = sqlContext.createDataFrame(data)
>>>
>>> df.persist()
>>>
>>> df.count()
>>>
>>> val t = System.nanoTime()
>>>
>>> val res = df.groupBy("key").agg(sum("value"))
>>>
>>> res.count()
>>>
>>> println((System.nanoTime() - t) / 1e9)
>>>
>>>
>>>
>>>
>>>
>>> Best regards, Alexander
>>>
>>
>>
>

Reply via email to