[ 
https://issues.apache.org/jira/browse/SPARK-15313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-15313:
-------------------------------------
    Target Version/s: 2.0.0

> EmbedSerializerInFilter rule should keep exprIds of output of surrounded 
> SerializeFromObject.
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15313
>                 URL: https://issues.apache.org/jira/browse/SPARK-15313
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Takuya Ueshin
>
> The following code:
> {code}
> val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
> ds.filter(_._1 == "b").select(expr("_1").as[String]).foreach(println(_))
> {code}
> throws an Exception:
> {noformat}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
> attribute, tree: _1#420
>  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:68)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:254)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>  at 
> org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:55)
>  at 
> org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:54)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.immutable.List.map(List.scala:285)
>  at 
> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:54)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>  at 
> org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
>  at 
> org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>  at 
> org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:218)
>  at 
> org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:244)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:218)
>  at 
> org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79)
>  at 
> org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>  at 
> org.apache.spark.sql.execution.DeserializeToObject.doExecute(objects.scala:59)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
>  at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2285)
>  at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2282)
>  at 
> org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2036)
>  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
>  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2435)
>  at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2035)
> ...
>  Cause: java.lang.RuntimeException: Couldn't find _1#420 in [_1#416,_2#417]
>  at scala.sys.package$.error(package.scala:27)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
>  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:265)
>  at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:68)
>  at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:254)
>  at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>  at 
> org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:55)
>  at 
> org.apache.spark.sql.execution.ProjectExec$$anonfun$4.apply(basicPhysicalOperators.scala:54)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.immutable.List.map(List.scala:285)
>  at 
> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:54)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>  at 
> org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
>  at 
> org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
>  at 
> org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:218)
>  at 
> org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:244)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:218)
>  at 
> org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79)
>  at 
> org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at 
> org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
>  at 
> org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
>  at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>  at 
> org.apache.spark.sql.execution.DeserializeToObject.doExecute(objects.scala:59)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
>  at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
>  at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2285)
>  at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2282)
>  at 
> org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply$mcV$sp(Dataset.scala:2036)
>  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
>  at org.apache.spark.sql.Dataset$$anonfun$foreach$1.apply(Dataset.scala:2036)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2435)
>  at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2035)
> ...
> {noformat}
> This is because {{EmbedSerializerInFilter}} rule drops the exprIds of output 
> of surrounded {{SerializeFromObject}}.
> The analyzed and optimized plans of the above example are as follows:
> {noformat}
> == Analyzed Logical Plan ==
> _1: string
> Project [_1#420]
> +- SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
> scala.Tuple2]._1, true) AS _1#420,input[0, scala.Tuple2]._2 AS _2#421]
>    +- Filter <function1>.apply
>       +- DeserializeToObject newInstance(class scala.Tuple2), obj#419: 
> scala.Tuple2
>          +- LocalRelation [_1#416,_2#417], 
> [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
> == Optimized Logical Plan ==
> !Project [_1#420]
> +- Filter <function1>.apply
>    +- LocalRelation [_1#416,_2#417], 
> [[0,1800000001,1,61],[0,1800000001,2,62],[0,1800000001,3,63]]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to