Sun Rui created SPARK-14803:
-------------------------------

             Summary: A bug in EliminateSerialization rule in Catalyst 
Optimizer 
                 Key: SPARK-14803
                 URL: https://issues.apache.org/jira/browse/SPARK-14803
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, SQL
            Reporter: Sun Rui


When I rebased my PR https://github.com/apache/spark/pull/12493 to master, I 
found a bug in EliminateSerialization rule in Catalyst Optimizer, which was 
introduced in the PR https://github.com/apache/spark/pull/12260.

The related code is:
{code}
object EliminateSerialization extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case d @ DeserializeToObject(_, _, s: SerializeFromObject)
        if (d.outputObjectType == s.inputObjectType)  =>
      // Adds an extra Project here, to preserve the output expr id of 
`DeserializeToObject`.
      val objAttr = Alias(s.child.output.head, "obj")(exprId = 
d.output.head.exprId)
      Project(objAttr :: Nil, s.child)
{code} 

In my PR, when there are multiple successive calls to dapply(), the 
SerializeFromObject and DeserializeToObject logical operators will be 
eliminated and replaced with a Project operator. However, the involved object 
is Row, and there is no support for Row in UnsafeRowWriter. 

Detailed error message:
{panel}
1. Error: dapply() on a DataFrame ----------------------------------------------
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 1156.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1156.0 
(TID 9648, localhost): java.util.concurrent.ExecutionException: 
java.lang.Exception: failed to compile: 
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 31, 
Column 29: No applicable constructor/method found for actual parameters "int, 
org.apache.spark.sql.Row"; candidates are: "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
byte[])", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
org.apache.spark.unsafe.types.UTF8String)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
org.apache.spark.sql.types.Decimal, int, int)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
double)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
byte[], int, int)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
org.apache.spark.unsafe.types.CalendarInterval)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
byte)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
boolean)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
short)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
int)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
long)", "public void 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, 
float)"
/* 001 */ 
/* 002 */ public java.lang.Object generate(Object[] references) {
/* 003 */   return new SpecificUnsafeProjection(references);
/* 004 */ }
/* 005 */ 
/* 006 */ class SpecificUnsafeProjection extends 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 007 */   
/* 008 */   private Object[] references;
/* 009 */   private UnsafeRow result;
/* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
/* 011 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
/* 012 */   
/* 013 */   
/* 014 */   public SpecificUnsafeProjection(Object[] references) {
/* 015 */     this.references = references;
/* 016 */     result = new UnsafeRow(1);
/* 017 */     this.holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
/* 018 */     this.rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
/* 019 */   }
/* 020 */   
/* 021 */   // Scala.Function1 need this
/* 022 */   public java.lang.Object apply(java.lang.Object row) {
/* 023 */     return apply((InternalRow) row);
/* 024 */   }
/* 025 */   
/* 026 */   public UnsafeRow apply(InternalRow i) {
/* 027 */     holder.reset();
/* 028 */     
/* 029 */     /* input[0, org.apache.spark.sql.Row] */
/* 030 */     org.apache.spark.sql.Row value = 
(org.apache.spark.sql.Row)i.get(0, null);
/* 031 */     rowWriter.write(0, value);
/* 032 */     result.setTotalSize(holder.totalSize());
/* 033 */     return result;
/* 034 */   }
/* 035 */ }
/* 036 */ 

        at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
        at 
org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
        at 
org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
        at 
org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
        at 
org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
        at 
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
        at 
org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
        at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
        at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
        at 
org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
        at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:636)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:395)
        at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:352)
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151)
        at 
org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:67)
        at 
org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:66)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{panel}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to