Did git pull :) Now I do get the difference in time between on/off Tungsten unsafe: it is 24-25 seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example below.
Why I am not getting the improvement as advertised on Spark Summit (slide 23)? http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen My dataset is 100M rows, is it big enough to get the improvement? Do I use aggregate correctly? case class Counter(key: Int, value: Double) val size = 100000000 val partitions = 5 val repetitions = 5 val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble)) val df = sqlContext.createDataFrame(data) df.persist() df.foreach { x => {} } val t = System.nanoTime() val res = df.groupBy("key").agg(sum("value")) res.foreach { x => {} } println((System.nanoTime() - t) / 1e9) Unsafe on: spark.sql.codegen true spark.sql.unsafe.enabled true spark.unsafe.offHeap true Unsafe off: spark.sql.codegen false spark.sql.unsafe.enabled false spark.unsafe.offHeap false From: Reynold Xin [mailto:r...@databricks.com] Sent: Thursday, August 20, 2015 5:43 PM To: Ulanov, Alexander Cc: dev@spark.apache.org Subject: Re: Dataframe aggregation with Tungsten unsafe Please git pull :) On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe feature was added to Spark on April 29.) From: Reynold Xin [mailto:r...@databricks.com<mailto:r...@databricks.com>] Sent: Thursday, August 20, 2015 5:26 PM To: Ulanov, Alexander Cc: dev@spark.apache.org<mailto:dev@spark.apache.org> Subject: Re: Dataframe aggregation with Tungsten unsafe Yes - DataFrame and SQL are the same thing. Which version are you running? Spark 1.4 doesn't run Janino --- but you have a Janino exception? On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: When I add the following option: spark.sql.codegen true Spark crashed on the “df.count” with concurrentException (below). Are you sure that I need to set this flag to get unsafe? It looks like SQL flag, and I don’t use sql. java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283) at org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188) at org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185) at org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159) at org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662) at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354) at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383) at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315) at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77) at org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246) at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64) at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273) at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) ... 28 more Caused by: java.lang.ClassNotFoundException: Override at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78) at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254) at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893) ... 66 more Caused by: java.lang.ClassNotFoundException: Override at java.lang.ClassLoader.findClass(ClassLoader.java:531) at org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30) at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64) ... 73 more From: Reynold Xin [mailto:r...@databricks.com<mailto:r...@databricks.com>] Sent: Thursday, August 20, 2015 4:22 PM To: Ulanov, Alexander Cc: dev@spark.apache.org<mailto:dev@spark.apache.org> Subject: Re: Dataframe aggregation with Tungsten unsafe I think you might need to turn codegen on also in order for the unsafe stuff to work. On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Hi Reynold, Thank you for suggestion. This code takes around 30 sec on my setup (5 workers with 32GB). My issue is that I don't see the change in time if I unset the unsafe flags. Could you explain why it might happen? 20 авг. 2015 г., в 15:32, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com><mailto:r...@databricks.com<mailto:r...@databricks.com>>> написал(а): I didn't wait long enough earlier. Actually it did finish when I raised memory to 8g. In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe flags), the query took 40s with 4G of mem. In 1.4, it took 195s with 8G of mem. This is not a scientific benchmark and I only ran it once. On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com<mailto:r...@databricks.com><mailto:r...@databricks.com<mailto:r...@databricks.com>>> wrote: How did you run this? I couldn't run your query with 4G of RAM in 1.4, but in 1.5 it ran. Also I recommend just dumping the data to parquet on disk to evaluate, rather than using the in-memory cache, which is super slow and we are thinking of removing/replacing with something else. val size = 100000000 val partitions = 10 val repetitions = 5 val data = sc.parallelize(1 to size, partitions).map(x => (util.Random.nextInt(size / repetitions), util.Random.nextDouble)).toDF("key", "value") data.write.parquet("/scratch/rxin/tmp/alex") val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex") val t = System.nanoTime() val res = df.groupBy("key").agg(sum("value")) res.count() println((System.nanoTime() - t) / 1e9) On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com><mailto:alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>>> wrote: Dear Spark developers, I am trying to benchmark the new Dataframe aggregation implemented under the project Tungsten and released with Spark 1.4 (I am using the latest Spark from the repo, i.e. 1.5): https://github.com/apache/spark/pull/5725 It tells that the aggregation should be faster due to using the unsafe to allocate memory and in-place update. It was also presented on Spark Summit this Summer: http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen The following enables the new aggregation in spark-config: spark.sql.unsafe.enabled=true spark.unsafe.offHeap=true I wrote a simple code that does aggregation of values by keys. However, the time needed to execute the code does not depend if the new aggregation is on or off. Could you suggest how can I observe the improvement that the aggregation provides? Could you write a code snippet that takes advantage of the new aggregation? case class Counter(key: Int, value: Double) val size = 100000000 val partitions = 5 val repetitions = 5 val data = sc.parallelize(1 to size, partitions).map(x => Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble)) val df = sqlContext.createDataFrame(data) df.persist() df.count() val t = System.nanoTime() val res = df.groupBy("key").agg(sum("value")) res.count() println((System.nanoTime() - t) / 1e9) Best regards, Alexander