Please git pull :)

On Thu, Aug 20, 2015 at 5:35 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

> I am using Spark 1.5 cloned from master on June 12. (The aggregate unsafe
> feature was added to Spark on April 29.)
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Thursday, August 20, 2015 5:26 PM
>
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> Yes - DataFrame and SQL are the same thing.
>
>
>
> Which version are you running? Spark 1.4 doesn't run Janino --- but you
> have a Janino exception?
>
>
>
> On Thu, Aug 20, 2015 at 5:01 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
> When I add the following option:
>
> spark.sql.codegen      true
>
>
>
> Spark crashed on the “df.count” with concurrentException (below). Are you
> sure that I need to set this flag to get unsafe? It looks like SQL flag,
> and I don’t use sql.
>
>
>
>
>
> java.util.concurrent.ExecutionException:
> org.codehaus.commons.compiler.CompileException: Line 14, Column 10: Override
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
>
>          at
> org.spark-project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>
>          at
> org.spark-project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>
>          at
> org.spark-project.guava.cache.LocalCache.get(LocalCache.java:4000)
>
>          at
> org.spark-project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>
>          at
> org.spark-project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:286)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:283)
>
>          at
> org.apache.spark.sql.execution.SparkPlan.newPredicate(SparkPlan.scala:180)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:277)
>
>          at
> org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$8.apply(InMemoryColumnarTableScan.scala:276)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>
>          at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
>          at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>          at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column
> 10: Override
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6897)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5331)
>
>          at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:5207)
>
>          at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:5188)
>
>          at
> org.codehaus.janino.UnitCompiler.access$12600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$16.visitReferenceType(UnitCompiler.java:5119)
>
>          at org.codehaus.janino.Java$ReferenceType.accept(Java.java:2880)
>
>          at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5159)
>
>          at
> org.codehaus.janino.UnitCompiler.hasAnnotation(UnitCompiler.java:830)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:814)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
>
>          at
> org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
>
>          at
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
>
>          at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>
>          at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
>
>          at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
>
>          at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
>
>          at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
>
>          at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
>
>          at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
>
>          at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
>
>          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>
>          at
> org.codehaus.janino.ClassBodyEvaluator.<init>(ClassBodyEvaluator.java:72)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:246)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:64)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:32)
>
>          at
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:273)
>
>          at
> org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>
>          at
> org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>
>          ... 28 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:69)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at java.lang.Class.forName0(Native Method)
>
>          at java.lang.Class.forName(Class.java:270)
>
>          at
> org.codehaus.janino.ClassLoaderIClassLoader.findIClass(ClassLoaderIClassLoader.java:78)
>
>          at
> org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:254)
>
>          at
> org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:6893)
>
>          ... 66 more
>
> Caused by: java.lang.ClassNotFoundException: Override
>
>          at java.lang.ClassLoader.findClass(ClassLoader.java:531)
>
>          at
> org.apache.spark.util.ParentClassLoader.findClass(ParentClassLoader.scala:26)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:34)
>
>          at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>
>          at
> org.apache.spark.util.ParentClassLoader.loadClass(ParentClassLoader.scala:30)
>
>          at
> org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:64)
>
>          ... 73 more
>
>
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Thursday, August 20, 2015 4:22 PM
> *To:* Ulanov, Alexander
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Dataframe aggregation with Tungsten unsafe
>
>
>
> I think you might need to turn codegen on also in order for the unsafe
> stuff to work.
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
> Hi Reynold,
>
> Thank you for suggestion. This code takes around 30 sec on my setup (5
> workers with 32GB). My issue is that I don't see the change in time if I
> unset the unsafe flags. Could you explain why it might happen?
>
> 20 авг. 2015 г., в 15:32, Reynold Xin <r...@databricks.com<mailto:
> r...@databricks.com>> написал(а):
>
>  I didn't wait long enough earlier. Actually it did finish when I raised
> memory to 8g.
>
> In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe
> flags), the query took 40s with 4G of mem.
>
> In 1.4, it took 195s with 8G of mem.
>
> This is not a scientific benchmark and I only ran it once.
>
>
>
> On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com<mailto:
> r...@databricks.com>> wrote:
> How did you run this? I couldn't run your query with 4G of RAM in 1.4, but
> in 1.5 it ran.
>
> Also I recommend just dumping the data to parquet on disk to evaluate,
> rather than using the in-memory cache, which is super slow and we are
> thinking of removing/replacing with something else.
>
>
> val size = 100000000
> val partitions = 10
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> (util.Random.nextInt(size / repetitions),
> util.Random.nextDouble)).toDF("key", "value")
>
> data.write.parquet("/scratch/rxin/tmp/alex")
>
>
> val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex")
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
> On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander <
> alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
> Dear Spark developers,
>
> I am trying to benchmark the new Dataframe aggregation implemented under
> the project Tungsten and released with Spark 1.4 (I am using the latest
> Spark from the repo, i.e. 1.5):
> https://github.com/apache/spark/pull/5725
> It tells that the aggregation should be faster due to using the unsafe to
> allocate memory and in-place update. It was also presented on Spark Summit
> this Summer:
>
> http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen
> The following enables the new aggregation in spark-config:
> spark.sql.unsafe.enabled=true
> spark.unsafe.offHeap=true
>
> I wrote a simple code that does aggregation of values by keys. However,
> the time needed to execute the code does not depend if the new aggregation
> is on or off. Could you suggest how can I observe the improvement that the
> aggregation provides? Could you write a code snippet that takes advantage
> of the new aggregation?
>
> case class Counter(key: Int, value: Double)
> val size = 100000000
> val partitions = 5
> val repetitions = 5
> val data = sc.parallelize(1 to size, partitions).map(x =>
> Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble))
> val df = sqlContext.createDataFrame(data)
> df.persist()
> df.count()
> val t = System.nanoTime()
> val res = df.groupBy("key").agg(sum("value"))
> res.count()
> println((System.nanoTime() - t) / 1e9)
>
>
> Best regards, Alexander
>
>
>
>
>

Reply via email to