[ https://issues.apache.org/jira/browse/SPARK-14803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15251862#comment-15251862 ]
Sun Rui commented on SPARK-14803: --------------------------------- cc [~cloud_fan] I will submit a PR for this, but not sure if it is the correct fix. please help to review it > A bug in EliminateSerialization rule in Catalyst Optimizer > ----------------------------------------------------------- > > Key: SPARK-14803 > URL: https://issues.apache.org/jira/browse/SPARK-14803 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Reporter: Sun Rui > > When I rebased my PR https://github.com/apache/spark/pull/12493 to master, I > found a bug in EliminateSerialization rule in Catalyst Optimizer, which was > introduced in the PR https://github.com/apache/spark/pull/12260. > The related code is: > {code} > object EliminateSerialization extends Rule[LogicalPlan] { > def apply(plan: LogicalPlan): LogicalPlan = plan transform { > case d @ DeserializeToObject(_, _, s: SerializeFromObject) > if (d.outputObjectType == s.inputObjectType) => > // Adds an extra Project here, to preserve the output expr id of > `DeserializeToObject`. > val objAttr = Alias(s.child.output.head, "obj")(exprId = > d.output.head.exprId) > Project(objAttr :: Nil, s.child) > {code} > In my PR, when there are multiple successive calls to dapply(), the > SerializeFromObject and DeserializeToObject logical operators will be > eliminated and replaced with a Project operator. However, the involved object > is Row, and there is no support for Row in UnsafeRowWriter. > Detailed error message: > {panel} > 1. Error: dapply() on a DataFrame > ---------------------------------------------- > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in > stage 1156.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 1156.0 (TID 9648, localhost): java.util.concurrent.ExecutionException: > java.lang.Exception: failed to compile: > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 31, Column 29: No applicable constructor/method found for actual parameters > "int, org.apache.spark.sql.Row"; candidates are: "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > byte[])", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > org.apache.spark.unsafe.types.UTF8String)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > org.apache.spark.sql.types.Decimal, int, int)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > double)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > byte[], int, int)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > org.apache.spark.unsafe.types.CalendarInterval)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > byte)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > boolean)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > short)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > int)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > long)", "public void > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(int, > float)" > /* 001 */ > /* 002 */ public java.lang.Object generate(Object[] references) { > /* 003 */ return new SpecificUnsafeProjection(references); > /* 004 */ } > /* 005 */ > /* 006 */ class SpecificUnsafeProjection extends > org.apache.spark.sql.catalyst.expressions.UnsafeProjection { > /* 007 */ > /* 008 */ private Object[] references; > /* 009 */ private UnsafeRow result; > /* 010 */ private > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder; > /* 011 */ private > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter; > /* 012 */ > /* 013 */ > /* 014 */ public SpecificUnsafeProjection(Object[] references) { > /* 015 */ this.references = references; > /* 016 */ result = new UnsafeRow(1); > /* 017 */ this.holder = new > org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32); > /* 018 */ this.rowWriter = new > org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1); > /* 019 */ } > /* 020 */ > /* 021 */ // Scala.Function1 need this > /* 022 */ public java.lang.Object apply(java.lang.Object row) { > /* 023 */ return apply((InternalRow) row); > /* 024 */ } > /* 025 */ > /* 026 */ public UnsafeRow apply(InternalRow i) { > /* 027 */ holder.reset(); > /* 028 */ > /* 029 */ /* input[0, org.apache.spark.sql.Row] */ > /* 030 */ org.apache.spark.sql.Row value = > (org.apache.spark.sql.Row)i.get(0, null); > /* 031 */ rowWriter.write(0, value); > /* 032 */ result.setTotalSize(holder.totalSize()); > /* 033 */ return result; > /* 034 */ } > /* 035 */ } > /* 036 */ > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306) > at > org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293) > at > org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > at > org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135) > at > org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410) > at > org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380) > at > org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) > at > org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) > at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) > at > org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) > at > org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:636) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:395) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:352) > at > org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:151) > at > org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:67) > at > org.apache.spark.sql.execution.Project$$anonfun$8.apply(basicOperators.scala:66) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:771) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:254) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {panel} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org