This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f17078  [SPARK-26619][SQL] Prune the unused serializers from 
SerializeFromObject
8f17078 is described below

commit 8f170787d24912c3a94ce1510197820c87df472a
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Wed Jan 16 19:16:37 2019 +0000

    [SPARK-26619][SQL] Prune the unused serializers from SerializeFromObject
    
    ## What changes were proposed in this pull request?
    
    `SerializeFromObject` now keeps all serializer expressions for domain 
object even when only part of output attributes are used by top plan.
    
    We should be able to prune unused serializers from `SerializeFromObject` in 
such case.
    
    ## How was this patch tested?
    
    Added tests.
    
    Closes #23562 from viirya/SPARK-26619.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: DB Tsai <d_t...@apple.com>
---
 .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala   |  5 +++++
 .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala     |  9 +++++++++
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala    | 11 +++++++++++
 3 files changed, 25 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d92f7f8..20f1221 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -561,6 +561,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
     case d @ DeserializeToObject(_, _, child) if 
!child.outputSet.subsetOf(d.references) =>
       d.copy(child = prunedChild(child, d.references))
 
+    case p @ Project(_, s: SerializeFromObject) if p.references != s.outputSet 
=>
+      val usedRefs = p.references
+      val prunedSerializer = s.serializer.filter(usedRefs.contains)
+      p.copy(child = SerializeFromObject(prunedSerializer, s.child))
+
     // Prunes the unused columns from child of 
Aggregate/Expand/Generate/ScriptTransformation
     case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) 
=>
       a.copy(child = prunedChild(child, a.references))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 0cd6e09..73112e3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -400,5 +400,14 @@ class ColumnPruningSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
+  test("SPARK-26619: Prune the unused serializers from SerializeFromObject") {
+    val testRelation = LocalRelation('_1.int, '_2.int)
+    val serializerObject = CatalystSerde.serialize[(Int, Int)](
+      CatalystSerde.deserialize[(Int, Int)](testRelation))
+    val query = serializerObject.select('_1)
+    val optimized = Optimize.execute(query.analyze)
+    val expected = serializerObject.copy(serializer = 
Seq(serializerObject.serializer.head)).analyze
+    comparePlans(optimized, expected)
+  }
   // todo: add more tests for column pruning
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c90b158..fb8239e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScroogeLikeExample
 import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject
 import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
@@ -1667,6 +1668,16 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
     val exceptDF = inputDF.filter(col("a").isin("0") or col("b") > "c")
     checkAnswer(inputDF.except(exceptDF), Seq(Row("1", null)))
   }
+
+  test("SPARK-26619: Prune the unused serializers from SerializeFromObjec") {
+    val data = Seq(("a", 1), ("b", 2), ("c", 3))
+    val ds = data.toDS().map(t => (t._1, t._2 + 1)).select("_1")
+    val serializer = ds.queryExecution.optimizedPlan.collect {
+      case s: SerializeFromObject => s
+    }.head
+    assert(serializer.serializer.size == 1)
+    checkAnswer(ds, Seq(Row("a"), Row("b"), Row("c")))
+  }
 }
 
 case class TestDataUnion(x: Int, y: Int, z: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to