Sun Rui created SPARK-14803: ------------------------------- Summary: A bug in EliminateSerialization rule in Catalyst Optimizer Key: SPARK-14803 URL: https://issues.apache.org/jira/browse/SPARK-14803 Project: Spark Issue Type: Bug Components: Optimizer, SQL Reporter: Sun Rui
When I rebased my PR https://github.com/apache/spark/pull/12493 to master, I found a bug in EliminateSerialization rule in Catalyst Optimizer, which was introduced in the PR https://github.com/apache/spark/pull/12260. The related code is: {code} object EliminateSerialization extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case d @ DeserializeToObject(_, _, s: SerializeFromObject) if (d.outputObjectType == s.inputObjectType) => // Adds an extra Project here, to preserve the output expr id of `DeserializeToObject`. val objAttr = Alias(s.child.output.head, "obj")(exprId = d.output.head.exprId) Project(objAttr :: Nil, s.child) {code} In my PR, when there are multiple successive calls to dapply(), the SerializeFromObject and DeserializeToObject logical operators will be eliminated and replaced with a Project operator. However, the involved object is Row, and there is no support for Row in UnsafeRowWriter. Detailed error message: {panel} 1. Error: dapply() on a DataFrame ---------------------------------------------- org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1156.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1156.0 (TID 9648, localhost): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 31, Column 29: No applicable constructor/method found for actual parameters "int, org.apache.spark.sql.Row"; candidates are: "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, byte[])", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, org.apache.spark.unsafe.types.UTF8String)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, org.apache.spark.sql.types.Decimal, int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, double)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, byte[], int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, org.apache.spark.unsafe.types.CalendarInterval)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, byte)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, boolean)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, short)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, int)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, long)", "public void org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, float)" /* 001 */ /* 002 */ public java.lang.Object generate(Object[] references) { /* 003 */ return new SpecificUnsafeProjection(references); /* 004 */ } /* 005 */ /* 006 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { /* 007 */ /* 008 */ private Object[] references; /* 009 */ private UnsafeRow result; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter; /* 012 */ /* 013 */ /* 014 */ public SpecificUnsafeProjection(Object[] references) { /* 015 */ this.references = references; /* 016 */ result = new UnsafeRow(1); /* 017 */ this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32); /* 018 */ this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1); /* 019 */ } /* 020 */ /* 021 */ // Scala.Function1 need this /* 022 */ public java.lang.Object apply(java.lang.Object row) { /* 023 */ return apply((InternalRow) row); /* 024 */ } /* 025 */ /* 026 */ public UnsafeRow apply(InternalRow i) { /* 027 */ holder.reset(); /* 028 */ /* 029 */ /* input[0, org.apache.spark.sql.Row] */ /* 030 */ org.apache.spark.sql.Row value = (org.apache.spark.sql.Row)i.get(0, null); /* 031 */ rowWriter.write(0, value); /* 032 */ result.setTotalSize(holder.totalSize()); /* 033 */ return result; /* 034 */ } /* 035 */ } /* 036 */ at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:636) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:395) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:352) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151) at org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:67) at org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:66) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {panel} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org