I’ve made few experiments in different settings based on the same code that you 
used.
1)Created two datasets in hdfs on a cluster of 5 worker nodes and copied them 
to local fs:
val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => 
(util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", 
"value")
data.write.parquet("hdfs://alex")
data.write.parquet(“/home/alex”)
val sample = data.sample(true, 0.1)
sample.write.parquet("hdfs://alex-10m")
sample.write.parquet(“/home/alex-10m”)
2) Run the following code in local mode (spark-shell --master local) and 
cluster mode (5 nodes with 1 worker each)
val df = sqlContext.read.parquet("data")
val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)
3) Run the same code in local and cluster mode with persisting the data in 
memory
val df = sqlContext.read.parquet("data")
df.persist
df.foreach { x => {} }
val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)

In the above both cases Tungsten was switched on or off by:
sqlContext.setConf("spark.sql.tungsten.enabled", "true" or ”false”).
Each experiment was run in a new shell. Below are the results:

Data size

Mode

Storage

Tungsten disabled

Tungsten enabled

10M

Cluster


Parquet

9.6

7.4

Persist

10.9

5.1

Local

Parquet

57.7

35.8

Persist

61.9

31.4

100M

Cluster

Parquet

25.4

18.8

Persist

48.6

14.8


Hardware: 6x nodes with 2x Xeon  X5650  @ 2.67 32GB RAM, 1 master, 5 workers. 
Local mode: one node.

It seems that there is a nice improvement with Tungsten enabled given that data 
is persisted in memory 2x and 3x. However, the improvement is not that nice for 
parquet, it is 1.5x. What’s interesting, with Tungsten enabled performance of 
in-memory data and parquet data aggregation is similar. Could anyone comment on 
this? It seems counterintuitive to me.

Local performance was not as good as Reynold had. I have around 1.5x, he had 
5x. However, local mode is not interesting.


From: Reynold Xin [mailto:r...@databricks.com]
Sent: Thursday, August 20, 2015 9:24 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Dataframe aggregation with Tungsten unsafe

Not sure what's going on or how you measure the time, but the difference here 
is pretty big when I test on my laptop. Maybe you set the wrong config 
variables? (spark.sql.* are sql variables that you set in sqlContext.setConf -- 
and in 1.5, they are consolidated into a single flag: 
spark.sql.tungsten.enabled. See below.


I ran with a 10m dataset (created by calling sample(true, 0.1) on the 100m 
dataset), since the 100m one takes too long when tungsten is off on my laptop 
so I didn't wait. (40s - 50s with Tungsten on)


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex-10m")

val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)


On 1.5, with 8g driver memory and 8 cores:

5.48951

sqlContext.setConf("spark.sql.tungsten.enabled", "false")

run it again, and took 25.127962.


On 1.4, with 8g driver memory and 8 cores: 25.583473


It's also possible that the benefit is less when you have infinite amount of 
memory (relative to the tiny dataset size) and as a result GC happens less.


On Thu, Aug 20, 2015 at 7:00 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Did git pull :)

Now I do get the difference in time between on/off Tungsten unsafe: it is 24-25 
seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example below.

Why I am not getting the improvement as advertised on Spark Summit (slide 23)?
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen

My dataset is 100M rows, is it big enough to get the improvement? Do I use 
aggregate correctly?


case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => 
Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.foreach { x => {} }
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.foreach { x => {} }
println((System.nanoTime() - t) / 1e9)

Unsafe on:
spark.sql.codegen       true
spark.sql.unsafe.enabled        true
spark.unsafe.offHeap    true

Unsafe off:
spark.sql.codegen       false
spark.sql.unsafe.enabled        false
spark.unsafe.offHeap    false

From: Reynold Xin [mailto:r...@databricks.com<mailto:r...@databricks.com>]
Sent: Thursday, August 20, 2015 5:43 PM

To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

Please git pull :)


On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe 
feature was added to Spark on April 29.)

From: Reynold Xin [mailto:r...@databricks.com<mailto:r...@databricks.com>]
Sent: Thursday, August 20, 2015 5:26 PM

To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

Yes - DataFrame and SQL are the same thing.

Which version are you running? Spark 1.4 doesn't run Janino --- but you have a 
Janino exception?

On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
When I add the following option:
spark.sql.codegen      true

Spark crashed on the “df.count” with concurrentException (below). Are you sure 
that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t 
use sql.


java.util.concurrent.ExecutionException: 
org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
         at 
org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
         at 
org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
         at 
org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
         at 
org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
         at 
org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
         at 
org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
         at 
org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
         at 
org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
         at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
         at 
org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
         at 
org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
         at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
         at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
         at 
org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
         at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
         at 
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
         at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
         at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
         at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
         at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
         at org.apache.spark.scheduler.Task.run(Task.scala:70)
         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
         at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: 
Override
         at 
org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
         at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
         at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
         at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
         at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
         at 
org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
         at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
         at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
         at 
org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
         at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
         at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
         at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
         at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
         at 
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at 
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
         at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
         at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
         at 
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
         at 
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
         at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
         at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
         at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
         at 
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
         at 
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
         at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
         at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
         at 
org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
         at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
         at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
         at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
         at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
         at 
org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
         at 
org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
         ... 28 more
Caused by: java.lang.ClassNotFoundException: Override
         at 
org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at java.lang.Class.forName0(Native Method)
         at java.lang.Class.forName(Class.java:270)
         at 
org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
         at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
         at 
org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
         ... 66 more
Caused by: java.lang.ClassNotFoundException: Override
         at java.lang.ClassLoader.findClass(ClassLoader.java:531)
         at 
org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
         at 
org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
         at 
org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
         at 
org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
         ... 73 more


From: Reynold Xin [mailto:r...@databricks.com<mailto:r...@databricks.com>]
Sent: Thursday, August 20, 2015 4:22 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Dataframe aggregation with Tungsten unsafe

I think you might need to turn codegen on also in order for the unsafe stuff to 
work.


On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi Reynold,

Thank you for suggestion. This code takes around 30 sec on my setup (5 workers 
with 32GB). My issue is that I don't see the change in time if I unset the 
unsafe flags. Could you explain why it might happen?

20 авг. 2015 г., в 15:32, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com><mailto:r...@databricks.com<mailto:r...@databricks.com>>>
 написал(а):

 I didn't wait long enough earlier. Actually it did finish when I raised memory 
to 8g.

In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), 
the query took 40s with 4G of mem.

In 1.4, it took 195s with 8G of mem.

This is not a scientific benchmark and I only ran it once.



On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com><mailto:r...@databricks.com<mailto:r...@databricks.com>>>
 wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 
1.5 it ran.

Also I recommend just dumping the data to parquet on disk to evaluate, rather 
than using the in-memory cache, which is super slow and we are thinking of 
removing/replacing with something else.


val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => 
(util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", 
"value")

data.write.parquet("/scratch/rxin/tmp/alex")


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)
On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com><mailto:alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>>>
 wrote:
Dear Spark developers,

I am trying to benchmark the new Dataframe aggregation implemented under the 
project Tungsten and released with Spark 1.4 (I am using the latest Spark from 
the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to 
allocate memory and in-place update. It was also presented on Spark Summit this 
Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true

I wrote a simple code that does aggregation of values by keys. However, the 
time needed to execute the code does not depend if the new aggregation is on or 
off. Could you suggest how can I observe the improvement that the aggregation 
provides? Could you write a code snippet that takes advantage of the new 
aggregation?

case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x => 
Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)


Best regards, Alexander




Reply via email to