When I add the following option:
spark.sql.codegen true
Spark crashed on the “df.count” with concurrentException (below). Are you sure
that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t
use sql.
java.util.concurrent.ExecutionException:
org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
at
org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at
org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
at
org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at
org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
at
org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
at
org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
at
org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at
org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
at
org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at
org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
at
org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
at
org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10:
Override
at
org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
at
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
at
org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
at
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
at
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
at
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
at
org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
at
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
at
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
at
org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 28 more
Caused by: java.lang.ClassNotFoundException: Override
at
org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
at
org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
... 66 more
Caused by: java.lang.ClassNotFoundException: Override
at java.lang.ClassLoader.findClass(ClassLoader.java:531)
at
org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at
org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at
org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
at
org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
... 73 more
From: Reynold Xin [mailto:[email protected]]
Sent: Thursday, August 20, 2015 4:22 PM
To: Ulanov, Alexander
Cc: [email protected]
Subject: Re: Dataframe aggregation with Tungsten unsafe
I think you might need to turn codegen on also in order for the unsafe stuff to
work.
On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
Hi Reynold,
Thank you for suggestion. This code takes around 30 sec on my setup (5 workers
with 32GB). My issue is that I don't see the change in time if I unset the
unsafe flags. Could you explain why it might happen?
20 авг. 2015 г., в 15:32, Reynold Xin
<[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected]>>>
написал(а):
I didn't wait long enough earlier. Actually it did finish when I raised memory
to 8g.
In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags),
the query took 40s with 4G of mem.
In 1.4, it took 195s with 8G of mem.
This is not a scientific benchmark and I only ran it once.
On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin
<[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected]>>>
wrote:
How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in
1.5 it ran.
Also I recommend just dumping the data to parquet on disk to evaluate, rather
than using the in-memory cache, which is super slow and we are thinking of
removing/replacing with something else.
val size = 100000000
val partitions = 10
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x =>
(util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key",
"value")
data.write.parquet("/scratch/rxin/tmp/alex")
val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)
On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander
<[email protected]<mailto:[email protected]><mailto:[email protected]<mailto:[email protected]>>>
wrote:
Dear Spark developers,
I am trying to benchmark the new Dataframe aggregation implemented under the
project Tungsten and released with Spark 1.4 (I am using the latest Spark from
the repo, i.e. 1.5):
https://github.com/apache/spark/pull/5725
It tells that the aggregation should be faster due to using the unsafe to
allocate memory and in-place update. It was also presented on Spark Summit this
Summer:
http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
The following enables the new aggregation in spark-config:
spark.sql.unsafe.enabled=true
spark.unsafe.offHeap=true
I wrote a simple code that does aggregation of values by keys. However, the
time needed to execute the code does not depend if the new aggregation is on or
off. Could you suggest how can I observe the improvement that the aggregation
provides? Could you write a code snippet that takes advantage of the new
aggregation?
case class Counter(key: Int, value: Double)
val size = 100000000
val partitions = 5
val repetitions = 5
val data = sc.parallelize(1 to size, partitions).map(x =>
Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
val df = sqlContext.createDataFrame(data)
df.persist()
df.count()
val t = System.nanoTime()
val res = df.groupBy("key").agg(sum("value"))
res.count()
println((System.nanoTime() - t) / 1e9)
Best regards, Alexander