[ 
https://issues.apache.org/jira/browse/SPARK-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15243770#comment-15243770
 ] 

Reynold Xin commented on SPARK-14675:
-------------------------------------

cc [~cloud_fan]

> ClassFormatError in codegen when using Aggregator
> -------------------------------------------------
>
>                 Key: SPARK-14675
>                 URL: https://issues.apache.org/jira/browse/SPARK-14675
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>         Environment: spark 2.0.0-SNAPSHOT
>            Reporter: koert kuipers
>
> code:
> {noformat}
>   val toList = new Aggregator[(String, Int), Seq[Int], Seq[Int]] {
>     def bufferEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
>     def finish(reduction: Seq[Int]): Seq[Int] = reduction
>     def merge(b1: Seq[Int],b2: Seq[Int]): Seq[Int] = b1 ++ b2
>     def outputEncoder: Encoder[Seq[Int]] = implicitly[Encoder[Seq[Int]]]
>     def reduce(b: Seq[Int],a: (String, Int)): Seq[Int] = b :+ a._2
>     def zero: Seq[Int] = Seq.empty[Int]
>   }
>   val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
>   val ds2 = ds1.groupByKey(_._1).agg(toList.toColumn)
>   ds2.show
> {noformat}
> this gives me:
> {noformat}
> 6/04/15 18:31:22 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 
> localhost): java.lang.ClassFormatError: Duplicate field name&signature in 
> class file 
> org/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificMutableProjection
>       at java.lang.ClassLoader.defineClass1(Native Method)
>       at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
>       at 
> org.codehaus.janino.ByteArrayClassLoader.findClass(ByteArrayClassLoader.java:66)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass.generate(Unknown 
> Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:140)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$create$2.apply(GenerateMutableProjection.scala:139)
>       at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:178)
>       at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:197)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.<init>(SortBasedAggregationIterator.scala:39)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:80)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregate.scala:71)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:768)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:72)
>       at org.apache.spark.scheduler.Task.run(Task.scala:86)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> when i do:
> {noformat}
>  ds2.queryExecution.debug.codegen()
> {noformat}
> i get:
> {noformat}
> Found 2 WholeStageCodegen subtrees.
> == Subtree 1 / 2 ==
> WholeStageCodegen
> :  +- Sort [value#6 ASC], false, 0
> :     +- INPUT
> +- AppendColumns <function1>, newInstance(class scala.Tuple2), 
> [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, input[0, java.lang.String], true) AS value#6]
>    +- LocalTableScan [_1#2,_2#3], 
> [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
> Generated code:
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */ 
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * Sort [value#6 ASC], false, 0
> /* 007 */ +- INPUT
> /* 008 */ */
> /* 009 */ final class GeneratedIterator extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 010 */   private Object[] references;
> /* 011 */   private boolean sort_needToSort;
> /* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
> /* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter 
> sort_sorter;
> /* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
> /* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
> /* 016 */   private scala.collection.Iterator inputadapter_input;
> /* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
> sort_dataSize;
> /* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue 
> sort_metricValue;
> /* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
> sort_spillSize;
> /* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue 
> sort_metricValue1;
> /* 021 */   
> /* 022 */   public GeneratedIterator(Object[] references) {
> /* 023 */     this.references = references;
> /* 024 */   }
> /* 025 */   
> /* 026 */   public void init(int index, scala.collection.Iterator inputs[]) {
> /* 027 */     partitionIndex = index;
> /* 028 */     sort_needToSort = true;
> /* 029 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) 
> references[0];
> /* 030 */     sort_sorter = sort_plan.createSorter();
> /* 031 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
> /* 032 */     
> /* 033 */     inputadapter_input = inputs[0];
> /* 034 */     this.sort_dataSize = 
> (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
> /* 035 */     sort_metricValue = 
> (org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
> sort_dataSize.localValue();
> /* 036 */     this.sort_spillSize = 
> (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
> /* 037 */     sort_metricValue1 = 
> (org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
> sort_spillSize.localValue();
> /* 038 */   }
> /* 039 */   
> /* 040 */   private void sort_addToSorter() throws java.io.IOException {
> /* 041 */     /*** PRODUCE: INPUT */
> /* 042 */     
> /* 043 */     while (inputadapter_input.hasNext()) {
> /* 044 */       InternalRow inputadapter_row = (InternalRow) 
> inputadapter_input.next();
> /* 045 */       /*** CONSUME: Sort [value#6 ASC], false, 0 */
> /* 046 */       
> /* 047 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
> /* 048 */       if (shouldStop()) return;
> /* 049 */     }
> /* 050 */     
> /* 051 */   }
> /* 052 */   
> /* 053 */   protected void processNext() throws java.io.IOException {
> /* 054 */     /*** PRODUCE: Sort [value#6 ASC], false, 0 */
> /* 055 */     if (sort_needToSort) {
> /* 056 */       sort_addToSorter();
> /* 057 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
> /* 058 */       sort_sortedIter = sort_sorter.sort();
> /* 059 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
> /* 060 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - 
> sort_spillSizeBefore);
> /* 061 */       
> sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
> /* 062 */       sort_needToSort = false;
> /* 063 */     }
> /* 064 */     
> /* 065 */     while (sort_sortedIter.hasNext()) {
> /* 066 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
> /* 067 */       
> /* 068 */       /*** CONSUME: WholeStageCodegen */
> /* 069 */       
> /* 070 */       append(sort_outputRow);
> /* 071 */       
> /* 072 */       if (shouldStop()) return;
> /* 073 */     }
> /* 074 */   }
> /* 075 */ }
> == Subtree 2 / 2 ==
> WholeStageCodegen
> :  +- Sort [value#6 ASC], false, 0
> :     +- INPUT
> +- Exchange hashpartitioning(value#6, 4), None
>    +- SortBasedAggregate(key=[value#6], 
> functions=[(anon$1(scala.Tuple2),mode=Partial,isDistinct=false)], 
> output=[value#6,value#15])
>       +- WholeStageCodegen
>          :  +- Sort [value#6 ASC], false, 0
>          :     +- INPUT
>          +- AppendColumns <function1>, newInstance(class scala.Tuple2), 
> [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
> fromString, input[0, java.lang.String], true) AS value#6]
>             +- LocalTableScan [_1#2,_2#3], 
> [[0,1800000001,1,61],[0,1800000001,2,61],[0,1800000001,3,61]]
> Generated code:
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */ 
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * Sort [value#6 ASC], false, 0
> /* 007 */ +- INPUT
> /* 008 */ */
> /* 009 */ final class GeneratedIterator extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 010 */   private Object[] references;
> /* 011 */   private boolean sort_needToSort;
> /* 012 */   private org.apache.spark.sql.execution.Sort sort_plan;
> /* 013 */   private org.apache.spark.sql.execution.UnsafeExternalRowSorter 
> sort_sorter;
> /* 014 */   private org.apache.spark.executor.TaskMetrics sort_metrics;
> /* 015 */   private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
> /* 016 */   private scala.collection.Iterator inputadapter_input;
> /* 017 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
> sort_dataSize;
> /* 018 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue 
> sort_metricValue;
> /* 019 */   private org.apache.spark.sql.execution.metric.LongSQLMetric 
> sort_spillSize;
> /* 020 */   private org.apache.spark.sql.execution.metric.LongSQLMetricValue 
> sort_metricValue1;
> /* 021 */   
> /* 022 */   public GeneratedIterator(Object[] references) {
> /* 023 */     this.references = references;
> /* 024 */   }
> /* 025 */   
> /* 026 */   public void init(int index, scala.collection.Iterator inputs[]) {
> /* 027 */     partitionIndex = index;
> /* 028 */     sort_needToSort = true;
> /* 029 */     this.sort_plan = (org.apache.spark.sql.execution.Sort) 
> references[0];
> /* 030 */     sort_sorter = sort_plan.createSorter();
> /* 031 */     sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
> /* 032 */     
> /* 033 */     inputadapter_input = inputs[0];
> /* 034 */     this.sort_dataSize = 
> (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1];
> /* 035 */     sort_metricValue = 
> (org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
> sort_dataSize.localValue();
> /* 036 */     this.sort_spillSize = 
> (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2];
> /* 037 */     sort_metricValue1 = 
> (org.apache.spark.sql.execution.metric.LongSQLMetricValue) 
> sort_spillSize.localValue();
> /* 038 */   }
> /* 039 */   
> /* 040 */   private void sort_addToSorter() throws java.io.IOException {
> /* 041 */     /*** PRODUCE: INPUT */
> /* 042 */     
> /* 043 */     while (inputadapter_input.hasNext()) {
> /* 044 */       InternalRow inputadapter_row = (InternalRow) 
> inputadapter_input.next();
> /* 045 */       /*** CONSUME: Sort [value#6 ASC], false, 0 */
> /* 046 */       
> /* 047 */       sort_sorter.insertRow((UnsafeRow)inputadapter_row);
> /* 048 */       if (shouldStop()) return;
> /* 049 */     }
> /* 050 */     
> /* 051 */   }
> /* 052 */   
> /* 053 */   protected void processNext() throws java.io.IOException {
> /* 054 */     /*** PRODUCE: Sort [value#6 ASC], false, 0 */
> /* 055 */     if (sort_needToSort) {
> /* 056 */       sort_addToSorter();
> /* 057 */       Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
> /* 058 */       sort_sortedIter = sort_sorter.sort();
> /* 059 */       sort_metricValue.add(sort_sorter.getPeakMemoryUsage());
> /* 060 */       sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - 
> sort_spillSizeBefore);
> /* 061 */       
> sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
> /* 062 */       sort_needToSort = false;
> /* 063 */     }
> /* 064 */     
> /* 065 */     while (sort_sortedIter.hasNext()) {
> /* 066 */       UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
> /* 067 */       
> /* 068 */       /*** CONSUME: WholeStageCodegen */
> /* 069 */       
> /* 070 */       append(sort_outputRow);
> /* 071 */       
> /* 072 */       if (shouldStop()) return;
> /* 073 */     }
> /* 074 */   }
> /* 075 */ }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to