This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8f17078 [SPARK-26619][SQL] Prune the unused serializers from SerializeFromObject 8f17078 is described below commit 8f170787d24912c3a94ce1510197820c87df472a Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Wed Jan 16 19:16:37 2019 +0000 [SPARK-26619][SQL] Prune the unused serializers from SerializeFromObject ## What changes were proposed in this pull request? `SerializeFromObject` now keeps all serializer expressions for domain object even when only part of output attributes are used by top plan. We should be able to prune unused serializers from `SerializeFromObject` in such case. ## How was this patch tested? Added tests. Closes #23562 from viirya/SPARK-26619. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: DB Tsai <d_t...@apple.com> --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 +++++ .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 9 +++++++++ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 11 +++++++++++ 3 files changed, 25 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d92f7f8..20f1221 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -561,6 +561,11 @@ object ColumnPruning extends Rule[LogicalPlan] { case d @ DeserializeToObject(_, _, child) if !child.outputSet.subsetOf(d.references) => d.copy(child = prunedChild(child, d.references)) + case p @ Project(_, s: SerializeFromObject) if p.references != s.outputSet => + val usedRefs = p.references + val prunedSerializer = s.serializer.filter(usedRefs.contains) + p.copy(child = SerializeFromObject(prunedSerializer, s.child)) + // Prunes the unused columns from child of Aggregate/Expand/Generate/ScriptTransformation case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) => a.copy(child = prunedChild(child, a.references)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 0cd6e09..73112e3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -400,5 +400,14 @@ class ColumnPruningSuite extends PlanTest { comparePlans(optimized, expected) } + test("SPARK-26619: Prune the unused serializers from SerializeFromObject") { + val testRelation = LocalRelation('_1.int, '_2.int) + val serializerObject = CatalystSerde.serialize[(Int, Int)]( + CatalystSerde.deserialize[(Int, Int)](testRelation)) + val query = serializerObject.select('_1) + val optimized = Optimize.execute(query.analyze) + val expected = serializerObject.copy(serializer = Seq(serializerObject.serializer.head)).analyze + comparePlans(optimized, expected) + } // todo: add more tests for column pruning } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c90b158..fb8239e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScroogeLikeExample import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -1667,6 +1668,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val exceptDF = inputDF.filter(col("a").isin("0") or col("b") > "c") checkAnswer(inputDF.except(exceptDF), Seq(Row("1", null))) } + + test("SPARK-26619: Prune the unused serializers from SerializeFromObjec") { + val data = Seq(("a", 1), ("b", 2), ("c", 3)) + val ds = data.toDS().map(t => (t._1, t._2 + 1)).select("_1") + val serializer = ds.queryExecution.optimizedPlan.collect { + case s: SerializeFromObject => s + }.head + assert(serializer.serializer.size == 1) + checkAnswer(ds, Seq(Row("a"), Row("b"), Row("c"))) + } } case class TestDataUnion(x: Int, y: Int, z: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org