Not sure what's going on or how you measure the time, but the difference
here is pretty big when I test on my laptop. Maybe you set the wrong config
variables? (spark.sql.* are sql variables that you set in
sqlContext.setConf -- and in 1.5, they are consolidated into a single
flag: spark.sql.tungsten.enabled. See below.


I ran with a 10m dataset (created by calling sample(true, 0.1) on the 100m
dataset), since the 100m one takes too long when tungsten is off on my
laptop so I didn't wait. (40s - 50s with Tungsten on)


val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex-10m")

val t = System.nanoTime()
df.groupBy("key").sum("value").queryExecution.toRdd.count()
println((System.nanoTime() - t) / 1e9)


On 1.5, with 8g driver memory and 8 cores:

5.48951

sqlContext.setConf("spark.sql.tungsten.enabled", "false")

run it again, and took 25.127962.


On 1.4, with 8g driver memory and 8 cores: 25.583473


It's also possible that the benefit is less when you have infinite amount
of memory (relative to the tiny dataset size) and as a result GC happens
less.


On Thu, Aug 20, 2015 at 7:00 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

> Did git pull :)
>
>
>
> Now I do get the difference in time between on/off Tungsten unsafe: it is
> 24-25 seconds (unsafe on) vs 32-26 seconds (unsafe off) for the example
> below.
>
>
>
> Why I am not getting the improvement as advertised on Spark Summit (slide
> 23)?
>
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
>
>
>
> My dataset is 100M rows, is it big enough to get the improvement? Do I use
> aggregate correctly?
>
>
>
>
>
> case class Counter(key: Int, value: Double)
>
> val size = 100000000
>
> val partitions = 5
>
> val repetitions = 5
>
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
>
> val df = sqlContext.createDataFrame(data)
>
> df.persist()
>
> df.foreach { x => {} }
>
> val t = System.nanoTime()
>
> val res = df.groupBy("key").agg(sum("value"))
>
> res.foreach { x => {} }
>
> println((System.nanoTime() - t) / 1e9)
>
>
>
> Unsafe on:
>
> spark.sql.codegen       true
>
> spark.sql.unsafe.enabled        true
>
> spark.unsafe.offHeap    true
>
>
>
> Unsafe off:
>
> spark.sql.codegen       false
>
> spark.sql.unsafe.enabled        false
>
> spark.unsafe.offHeap    false
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Thursday, August 20, 2015 5:43 PM
>
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> Please git pull :)
>
>
>
>
>
> On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
> I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe
> feature was added to Spark on April 29.)
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Thursday, August 20, 2015 5:26 PM
>
>
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> Yes - DataFrame and SQL are the same thing.
>
>
>
> Which version are you running? Spark 1.4 doesn't run Janino --- but you
> have a Janino exception?
>
>
>
> On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
> When I add the following option:
>
> spark.sql.codegen      true
>
>
>
> Spark crashed on the “df.count” with concurrentException (below). Are you
> sure that I need to set this flag to get unsafe? It looks like SQL flag,
> and I don’t use sql.
>
>
>
>
>
> java.util.concurrent.ExecutionException:
> org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>
>          at
> org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>
>          at
> org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
>
>          at
> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>
>          at
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
>
>          at
> org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>          at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column
> 10: Override
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
>
>          at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
>
>          at
> org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
>
>          at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
>
>          at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
>
>          at
> org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
>
>          at
> org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
>
>          at
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>
>          at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
>
>          at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
>
>          at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
>
>          at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
>
>          at
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>
>          ... 28 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at java.lang.Class.forName0(Native Method)
>
>          at java.lang.Class.forName(Class.java:270)
>
>          at
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
>
>          at
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
>
>          ... 66 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at java.lang.ClassLoader.findClass(ClassLoader.java:531)
>
>          at
> org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
>
>          ... 73 more
>
>
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Thursday, August 20, 2015 4:22 PM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> I think you might need to turn codegen on also in order for the unsafe
> stuff to work.
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <r...@databricks.com<mailto:
> r...@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com<mailto:
> r...@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>
>
>
>
>

Reply via email to