[ https://issues.apache.org/jira/browse/SPARK-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15243770#comment-15243770 ]
Reynold Xin commented on SPARK-14675: ------------------------------------- cc [~cloud_fan] > ClassFormatError in codegen when using Aggregator > ------------------------------------------------- > > Key: SPARK-14675 > URL: https://issues.apache.org/jira/browse/SPARK-14675 > Project: Spark > Issue Type: Bug > Components: SQL > Environment: spark 2.0.0-SNAPSHOT > Reporter: koert kuipers > > code: > {noformat} > val toList = new Aggregator[(String, Int), Seq[Int], Seq[Int]] { > def bufferEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]] > def finish(reduction: Seq[Int]): Seq[Int] = reduction > def merge(b1: Seq[Int],b2: Seq[Int]): Seq[Int] = b1 ++ b2 > def outputEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]] > def reduce(b: Seq[Int],a: (String, Int)): Seq[Int] = b :+ a._2 > def zero: Seq[Int] = Seq.empty[Int] > } > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.groupByKey(_._1).agg(toList.toColumn) > ds2.show > {noformat} > this gives me: > {noformat} > 6/04/15 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, > localhost): java.lang.ClassFormatError: Duplicate field name&signature in > class file > org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificMutableProjection > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:800) > at > org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:66) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass.generate(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:140) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:139) > at > org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:178) > at > org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:197) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:39) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:80) > at > org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:71) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:72) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {noformat} > when i do: > {noformat} > ds2.queryExecution.debug.codegen() > {noformat} > i get: > {noformat} > Found 2 WholeStageCodegen subtrees. > == Subtree 1 / 2 == > WholeStageCodegen > : +- Sort [value#6 ASC], false, 0 > : +- INPUT > +- AppendColumns <function1>, newInstance(class scala.Tuple2), > [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, > fromString, input[0, java.lang.String], true) AS value#6] > +- LocalTableScan [_1#2,_2#3], > [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]] > Generated code: > /* 001 */ public Object generate(Object[] references) { > /* 002 */ return new GeneratedIterator(references); > /* 003 */ } > /* 004 */ > /* 005 */ /** Codegened pipeline for: > /* 006 */ * Sort [value#6 ASC], false, 0 > /* 007 */ +- INPUT > /* 008 */ */ > /* 009 */ final class GeneratedIterator extends > org.apache.spark.sql.execution.BufferedRowIterator { > /* 010 */ private Object[] references; > /* 011 */ private boolean sort_needToSort; > /* 012 */ private org.apache.spark.sql.execution.Sort sort_plan; > /* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter > sort_sorter; > /* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics; > /* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter; > /* 016 */ private scala.collection.Iterator inputadapter_input; > /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric > sort_dataSize; > /* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue > sort_metricValue; > /* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric > sort_spillSize; > /* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue > sort_metricValue1; > /* 021 */ > /* 022 */ public GeneratedIterator(Object[] references) { > /* 023 */ this.references = references; > /* 024 */ } > /* 025 */ > /* 026 */ public void init(int index, scala.collection.Iterator inputs[]) { > /* 027 */ partitionIndex = index; > /* 028 */ sort_needToSort = true; > /* 029 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) > references[0]; > /* 030 */ sort_sorter = sort_plan.createSorter(); > /* 031 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); > /* 032 */ > /* 033 */ inputadapter_input = inputs[0]; > /* 034 */ this.sort_dataSize = > (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; > /* 035 */ sort_metricValue = > (org.apache.spark.sql.execution.metric.LongSQLMetricValue) > sort_dataSize.localValue(); > /* 036 */ this.sort_spillSize = > (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2]; > /* 037 */ sort_metricValue1 = > (org.apache.spark.sql.execution.metric.LongSQLMetricValue) > sort_spillSize.localValue(); > /* 038 */ } > /* 039 */ > /* 040 */ private void sort_addToSorter() throws java.io.IOException { > /* 041 */ /*** PRODUCE: INPUT */ > /* 042 */ > /* 043 */ while (inputadapter_input.hasNext()) { > /* 044 */ InternalRow inputadapter_row = (InternalRow) > inputadapter_input.next(); > /* 045 */ /*** CONSUME: Sort [value#6 ASC], false, 0 */ > /* 046 */ > /* 047 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row); > /* 048 */ if (shouldStop()) return; > /* 049 */ } > /* 050 */ > /* 051 */ } > /* 052 */ > /* 053 */ protected void processNext() throws java.io.IOException { > /* 054 */ /*** PRODUCE: Sort [value#6 ASC], false, 0 */ > /* 055 */ if (sort_needToSort) { > /* 056 */ sort_addToSorter(); > /* 057 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); > /* 058 */ sort_sortedIter = sort_sorter.sort(); > /* 059 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage()); > /* 060 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - > sort_spillSizeBefore); > /* 061 */ > sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); > /* 062 */ sort_needToSort = false; > /* 063 */ } > /* 064 */ > /* 065 */ while (sort_sortedIter.hasNext()) { > /* 066 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); > /* 067 */ > /* 068 */ /*** CONSUME: WholeStageCodegen */ > /* 069 */ > /* 070 */ append(sort_outputRow); > /* 071 */ > /* 072 */ if (shouldStop()) return; > /* 073 */ } > /* 074 */ } > /* 075 */ } > == Subtree 2 / 2 == > WholeStageCodegen > : +- Sort [value#6 ASC], false, 0 > : +- INPUT > +- Exchange hashpartitioning(value#6, 4), None > +- SortBasedAggregate(key=[value#6], > functions=[(anon$1(scala.Tuple2),mode=Partial,isDistinct=false)], > output=[value#6,value#15]) > +- WholeStageCodegen > : +- Sort [value#6 ASC], false, 0 > : +- INPUT > +- AppendColumns <function1>, newInstance(class scala.Tuple2), > [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, > fromString, input[0, java.lang.String], true) AS value#6] > +- LocalTableScan [_1#2,_2#3], > [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]] > Generated code: > /* 001 */ public Object generate(Object[] references) { > /* 002 */ return new GeneratedIterator(references); > /* 003 */ } > /* 004 */ > /* 005 */ /** Codegened pipeline for: > /* 006 */ * Sort [value#6 ASC], false, 0 > /* 007 */ +- INPUT > /* 008 */ */ > /* 009 */ final class GeneratedIterator extends > org.apache.spark.sql.execution.BufferedRowIterator { > /* 010 */ private Object[] references; > /* 011 */ private boolean sort_needToSort; > /* 012 */ private org.apache.spark.sql.execution.Sort sort_plan; > /* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter > sort_sorter; > /* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics; > /* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter; > /* 016 */ private scala.collection.Iterator inputadapter_input; > /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric > sort_dataSize; > /* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue > sort_metricValue; > /* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric > sort_spillSize; > /* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue > sort_metricValue1; > /* 021 */ > /* 022 */ public GeneratedIterator(Object[] references) { > /* 023 */ this.references = references; > /* 024 */ } > /* 025 */ > /* 026 */ public void init(int index, scala.collection.Iterator inputs[]) { > /* 027 */ partitionIndex = index; > /* 028 */ sort_needToSort = true; > /* 029 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) > references[0]; > /* 030 */ sort_sorter = sort_plan.createSorter(); > /* 031 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); > /* 032 */ > /* 033 */ inputadapter_input = inputs[0]; > /* 034 */ this.sort_dataSize = > (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; > /* 035 */ sort_metricValue = > (org.apache.spark.sql.execution.metric.LongSQLMetricValue) > sort_dataSize.localValue(); > /* 036 */ this.sort_spillSize = > (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2]; > /* 037 */ sort_metricValue1 = > (org.apache.spark.sql.execution.metric.LongSQLMetricValue) > sort_spillSize.localValue(); > /* 038 */ } > /* 039 */ > /* 040 */ private void sort_addToSorter() throws java.io.IOException { > /* 041 */ /*** PRODUCE: INPUT */ > /* 042 */ > /* 043 */ while (inputadapter_input.hasNext()) { > /* 044 */ InternalRow inputadapter_row = (InternalRow) > inputadapter_input.next(); > /* 045 */ /*** CONSUME: Sort [value#6 ASC], false, 0 */ > /* 046 */ > /* 047 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row); > /* 048 */ if (shouldStop()) return; > /* 049 */ } > /* 050 */ > /* 051 */ } > /* 052 */ > /* 053 */ protected void processNext() throws java.io.IOException { > /* 054 */ /*** PRODUCE: Sort [value#6 ASC], false, 0 */ > /* 055 */ if (sort_needToSort) { > /* 056 */ sort_addToSorter(); > /* 057 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); > /* 058 */ sort_sortedIter = sort_sorter.sort(); > /* 059 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage()); > /* 060 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - > sort_spillSizeBefore); > /* 061 */ > sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); > /* 062 */ sort_needToSort = false; > /* 063 */ } > /* 064 */ > /* 065 */ while (sort_sortedIter.hasNext()) { > /* 066 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); > /* 067 */ > /* 068 */ /*** CONSUME: WholeStageCodegen */ > /* 069 */ > /* 070 */ append(sort_outputRow); > /* 071 */ > /* 072 */ if (shouldStop()) return; > /* 073 */ } > /* 074 */ } > /* 075 */ } > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org