Repository: spark
Updated Branches:
  refs/heads/master b857fb549 -> 7bbec0dce


[SPARK-24061][SS] Add TypedFilter support for continuous processing

## What changes were proposed in this pull request?

Add TypedFilter support for continuous processing application.

## How was this patch tested?

unit tests

Author: wangyanlin01 <wangyanli...@baidu.com>

Closes #21136 from yanlin-Lynn/SPARK-24061.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bbec0dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bbec0dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bbec0dc

Branch: refs/heads/master
Commit: 7bbec0dced35aeed79c1a24b6f7a1e0a3508b0fb
Parents: b857fb5
Author: wangyanlin01 <wangyanli...@baidu.com>
Authored: Tue May 1 16:22:52 2018 +0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue May 1 16:22:52 2018 +0800

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala  |  3 ++-
 .../analysis/UnsupportedOperationsSuite.scala   | 23 ++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7bbec0dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index ff9d6d7..d3d6c63 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -345,7 +345,8 @@ object UnsupportedOperationChecker {
     plan.foreachUp { implicit subPlan =>
       subPlan match {
         case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
-              _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias) =>
+              _: DeserializeToObject | _: SerializeFromObject | _: 
SubqueryAlias |
+              _: TypedFilter) =>
         case node if node.nodeName == "StreamingRelationV2" =>
         case node =>
           throwError(s"Continuous processing does not support ${node.nodeName} 
operations.")

http://git-wip-us.apache.org/repos/asf/spark/blob/7bbec0dc/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 60d1351..cb487c8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -621,6 +621,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     outputMode = Append,
     expectedMsgs = Seq("monotonically_increasing_id"))
 
+  assertSupportedForContinuousProcessing(
+    "TypedFilter", TypedFilter(
+      null,
+      null,
+      null,
+      null,
+      new TestStreamingRelationV2(attribute)), OutputMode.Append())
 
   /*
     
=======================================================================================
@@ -771,6 +778,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     }
   }
 
+  /** Assert that the logical plan is supported for continuous procsssing mode 
*/
+  def assertSupportedForContinuousProcessing(
+    name: String,
+    plan: LogicalPlan,
+    outputMode: OutputMode): Unit = {
+    test(s"continuous processing - $name: supported") {
+      UnsupportedOperationChecker.checkForContinuous(plan, outputMode)
+    }
+  }
+
   /**
    * Assert that the logical plan is not supported inside a streaming plan.
    *
@@ -840,4 +857,10 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     def this(attribute: Attribute) = this(Seq(attribute))
     override def isStreaming: Boolean = true
   }
+
+  case class TestStreamingRelationV2(output: Seq[Attribute]) extends LeafNode {
+    def this(attribute: Attribute) = this(Seq(attribute))
+    override def isStreaming: Boolean = true
+    override def nodeName: String = "StreamingRelationV2"
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to