This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 16818da  [FLINK-12743][table-runtime-blink] Introduce unbounded 
streaming anti/semi join operator
16818da is described below

commit 16818dad7e04fac8f862771a23a702f72e605e72
Author: Jark Wu <imj...@gmail.com>
AuthorDate: Wed Jun 19 20:04:08 2019 +0800

    [FLINK-12743][table-runtime-blink] Introduce unbounded streaming anti/semi 
join operator
    
    This closes #8792
---
 .../nodes/physical/stream/StreamExecJoin.scala     |  44 +-
 .../runtime/stream/sql/Limit0RemoveITCase.scala    |   6 +-
 .../stream/sql/SemiAntiJoinStreamITCase.scala      | 541 +++++++++++++++++++++
 .../table/runtime/join/NullAwareJoinHelper.java    |   4 +-
 .../join/stream/AbstractStreamingJoinOperator.java | 259 ++++++++++
 .../runtime/join/stream/StreamingJoinOperator.java | 216 +-------
 .../join/stream/StreamingSemiAntiJoinOperator.java | 199 ++++++++
 7 files changed, 1035 insertions(+), 234 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
index 015e0ea..a7fb264 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala
@@ -25,14 +25,14 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexNode
 import org.apache.flink.streaming.api.transformations.{StreamTransformation, 
TwoInputTransformation}
-import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
+import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin
 import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.plan.util.{JoinUtil, KeySelectorUtil, 
RelExplainUtil}
 import org.apache.flink.table.runtime.join.FlinkJoinType
-import org.apache.flink.table.runtime.join.stream.StreamingJoinOperator
+import org.apache.flink.table.runtime.join.stream.{StreamingJoinOperator, 
StreamingSemiAntiJoinOperator}
 import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
 
@@ -162,24 +162,32 @@ class StreamExecJoin(
       leftType.toRowType,
       rightType.toRowType)
 
-    if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) {
-      throw new TableException("SEMI/ANTI Join is not supported yet.")
-    }
-
-    val leftIsOuter = joinType == JoinRelType.LEFT || joinType == 
JoinRelType.FULL
-    val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == 
JoinRelType.FULL
     val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
 
-    val operator = new StreamingJoinOperator(
-      leftType,
-      rightType,
-      generatedCondition,
-      leftInputSpec,
-      rightInputSpec,
-      leftIsOuter,
-      rightIsOuter,
-      filterNulls,
-      minRetentionTime)
+    val operator = if (joinType == JoinRelType.ANTI || joinType == 
JoinRelType.SEMI) {
+      new StreamingSemiAntiJoinOperator(
+        joinType == JoinRelType.ANTI,
+        leftType,
+        rightType,
+        generatedCondition,
+        leftInputSpec,
+        rightInputSpec,
+        filterNulls,
+        minRetentionTime)
+    } else {
+      val leftIsOuter = joinType == JoinRelType.LEFT || joinType == 
JoinRelType.FULL
+      val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == 
JoinRelType.FULL
+      new StreamingJoinOperator(
+        leftType,
+        rightType,
+        generatedCondition,
+        leftInputSpec,
+        rightInputSpec,
+        leftIsOuter,
+        rightIsOuter,
+        filterNulls,
+        minRetentionTime)
+    }
 
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftTransform,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
index 6a402d5..6bb8aea 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala
@@ -122,8 +122,7 @@ class Limit0RemoveITCase extends StreamingTestBase() {
     assertEquals(expected, sink.getAppendResults.sorted)
   }
 
-  @Test(expected = classOf[TableException])
-  // TODO remove exception after translateToPlanInternal is implemented in 
StreamExecJoin
+  @Test
   def testLimitRemoveWithExists(): Unit = {
     val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
     val table1 = ds1.toTable(tEnv, 'a)
@@ -143,8 +142,7 @@ class Limit0RemoveITCase extends StreamingTestBase() {
     assertEquals(0, sink.getRawResults.size)
   }
 
-  @Test(expected = classOf[TableException])
-  // TODO remove exception after translateToPlanInternal is implemented in 
StreamExecJoin
+  @Test
   def testLimitRemoveWithNotExists(): Unit = {
     val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6))
     val table1 = ds1.toTable(tEnv, 'a)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SemiAntiJoinStreamITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SemiAntiJoinStreamITCase.scala
new file mode 100644
index 0000000..f9407bb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SemiAntiJoinStreamITCase.scala
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, 
TestData, TestingRetractSink}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.Seq
+
+@RunWith(classOf[Parameterized])
+class SemiAntiJoinStreamITCase(state: StateBackendMode)
+  extends StreamingWithStateTestBase(state)  {
+
+  override def before(): Unit = {
+    super.before()
+    val tableA = failingDataSource(TestData.smallTupleData3)
+      .toTable(tEnv, 'a1, 'a2, 'a3)
+    val tableB = failingDataSource(TestData.tupleData5)
+      .toTable(tEnv, 'b1, 'b2, 'b3, 'b4, 'b5)
+    tEnv.registerTable("A", tableA)
+    tEnv.registerTable("B", tableB)
+  }
+
+  val data = List(
+    (1, 1L, 0, "Hallo", 1L),
+    (2, 2L, 1, "Hallo Welt", 2L),
+    (2, 3L, 2, "Hallo Welt wie", 1L),
+    (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
+    (3, 5L, 4, "ABC", 2L),
+    (3, 6L, 5, "BCD", 3L)
+  )
+
+  val data2 = List(
+    (1, 1L, "Hi"),
+    (2, 2L, "Hello"),
+    (3, 2L, "Hello world")
+  )
+
+  val dataCannotBeJoin = List(
+    (2, 3L, 2, "Hallo Welt wie", 1L),
+    (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
+    (3, 5L, 4, "ABC", 2L),
+    (3, 6L, 5, "BCD", 3L)
+  )
+
+  @Test
+  def testGenericSemiJoin(): Unit = {
+    val ds1 = failingDataSource(data2).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = failingDataSource(data).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+    val query = "SELECT a, b, c FROM ds1 WHERE a in (SELECT d from ds2 WHERE d 
< 3)"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1,1,Hi", "2,2,Hello")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoinWithOneSideRetraction(): Unit = {
+    val leftTable = List(
+      (1, "a"),
+      (2, "b"),
+      (10, "c"),
+      (6, "d"),
+      (8, "e")
+    )
+
+    val rightTable = List(
+      (0, "a"),
+      (1, "a"),
+      (1, "b"),
+      (1, "b"),
+      (1, "c"),
+      (2, "c"),
+      (3, "c"),
+      (4, "c"),
+      (1, "d"),
+      (2, "d"),
+      (3, "d"),
+      (4, "e"),
+      (4, "e")
+    )
+
+    val ds1 = failingDataSource(leftTable).toTable(tEnv, 'a, 'b)
+    val ds2 = failingDataSource(rightTable).toTable(tEnv, 'c, 'd)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+    val query = "SELECT a FROM ds1 WHERE a in (SELECT sum(c) from ds2 GROUP BY 
d)"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+    val expected = Seq("1", "2", "10", "6", "8")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoinWithRetractTwoSidesRetraction(): Unit = {
+
+    val tableData = List(
+      (0, "a"),
+      (1, "a"),
+      (1, "b"),
+      (1, "b"),
+      (1, "c"),
+      (2, "c"),
+      (3, "c"),
+      (4, "c"),
+      (1, "d"),
+      (2, "d"),
+      (3, "d"),
+      (3, "e"),
+      (5, "e")
+    )
+    val ds1 = failingDataSource(tableData).toTable(tEnv, 'a, 'b)
+    val ds2 = failingDataSource(tableData).toTable(tEnv, 'c, 'd)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+    val ds3 = tEnv.sqlQuery("SELECT sum(a) as a FROM ds1 GROUP BY b")
+    tEnv.registerTable("ds3", ds3)
+    val query = "SELECT a FROM ds3 WHERE a in (SELECT sum(c) from ds2 GROUP BY 
d)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1", "2", "10", "6", "8")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testGenericAntiJoin(): Unit = {
+    val ds1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+    val ds2 = failingDataSource(data2).toTable(tEnv, 'f, 'g, 'h)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+    val query = "SELECT c FROM ds1 WHERE NOT EXISTS (SELECT * from ds2 WHERE b 
= g)"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+    val expected = Seq("2", "3", "4", "5")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAntiJoinWithOneSideRetraction(): Unit = {
+    val leftTable = List(
+      (1, "a"),
+      (2, "b"),
+      (10, "c"),
+      (6, "d"),
+      (8, "e"),
+      (11, "f")
+    )
+
+    val rightTable = List(
+      (0, "a"),
+      (1, "a"),
+      (1, "b"),
+      (1, "b"),
+      (1, "c"),
+      (2, "c"),
+      (3, "c"),
+      (4, "c"),
+      (1, "d"),
+      (2, "d"),
+      (3, "d"),
+      (4, "e"),
+      (4, "e")
+    )
+
+    val ds1 = failingDataSource(leftTable).toTable(tEnv, 'a, 'b)
+    val ds2 = failingDataSource(rightTable).toTable(tEnv, 'c, 'd)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+    val ds3 = tEnv.sqlQuery("SELECT SUM(c) as c FROM ds2 GROUP BY d")
+    tEnv.registerTable("ds3", ds3)
+    val query = "SELECT * FROM ds1 WHERE NOT EXISTS (SELECT c from ds3 WHERE a 
= c)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+    val expected = Seq("11,f")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAntiJoinWithTwoSidesRetraction(): Unit = {
+    val leftTable = List(
+      (0, "a"),
+      (5, "f"),
+      (-2, "a"),
+      (1, "b"),
+      (1, "b"),
+      (1, "c"),
+      (2, "c"),
+      (3, "c"),
+      (1, "f"),
+      (4, "c"),
+      (1, "d"),
+      (2, "d"),
+      (3, "d"),
+      (4, "e"),
+      (3, "a"),
+      (3, "e"),
+      (2, "f")
+    )
+
+    val rightTable = List(
+      (0, "a"),
+      (1, "a"),
+      (1, "b"),
+      (1, "b"),
+      (1, "c"),
+      (2, "c"),
+      (3, "c"),
+      (4, "c"),
+      (1, "d"),
+      (2, "d"),
+      (3, "d"),
+      (4, "e"),
+      (3, "e")
+    )
+
+    val ds1 = failingDataSource(leftTable).toTable(tEnv, 'a, 'b)
+    val ds2 = failingDataSource(rightTable).toTable(tEnv, 'c, 'd)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+    val ds3 = tEnv.sqlQuery("SELECT SUM(c) as c FROM ds2 GROUP BY d")
+    tEnv.registerTable("ds3", ds3)
+    val ds4 = tEnv.sqlQuery("SELECT SUM(a) as a, b FROM ds1 GROUP BY b")
+    tEnv.registerTable("ds4", ds4)
+    val query = "SELECT * FROM ds4 WHERE NOT EXISTS (SELECT c from ds3 WHERE a 
= c)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+    val expected = Seq("8,f")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoin(): Unit = {
+    val query = "SELECT * FROM A WHERE a1 in (SELECT b1 from B)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoinNonEqui(): Unit = {
+    val query = "SELECT * FROM A WHERE a1 in (SELECT b1 from B WHERE a2 < b2)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoinWithEqualPkNonEqui(): Unit = {
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
+    val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
+    val query = s"SELECT * FROM ($query1) WHERE a1 in (SELECT b1 from 
($query2) WHERE a2 < b2)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("2,3", "2,2")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoinWithRightNotPkNonEqui(): Unit = {
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
+    val query = s"SELECT * FROM ($query1) WHERE a1 in (SELECT b1 from B WHERE 
a2 < b2)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("2,2", "2,3")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testSemiJoinWithPkNonEqui(): Unit = {
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
+    val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
+    val query = s"SELECT * FROM ($query1) WHERE a2 in (SELECT b2 from 
($query2) WHERE a1 > b1)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    assertEquals(0, sink.getRetractResults.size)
+  }
+
+  @Test
+  def testAntiJoin(): Unit = {
+    val query = "SELECT * FROM A WHERE NOT EXISTS (SELECT b1 from B WHERE a1 = 
b1)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    assertEquals(0, sink.getRetractResults.size)
+  }
+
+  @Test
+  def testAntiJoinNonEqui(): Unit = {
+    val query = "SELECT * FROM A WHERE NOT EXISTS (SELECT b1 from B WHERE a1 = 
b1 AND a2 < b2)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1,1,Hi")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAntiJoinWithEqualPkNonEqui(): Unit = {
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
+    val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
+    val query = s"SELECT * FROM ($query1) WHERE NOT EXISTS (SELECT b1 from 
($query2) WHERE a1 = " +
+      s"b1 AND a2 < b2)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1,1")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAntiJoinWithRightNotPkNonEqui(): Unit = {
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
+    val query = s"SELECT * FROM ($query1) WHERE NOT EXISTS (SELECT b1 from B 
WHERE a1 = b1 AND a2" +
+      s" > b2)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("2,2", "1,1", "2,3")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testAntiJoinWithPkNonEqui(): Unit = {
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
+    val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
+    val query = s"SELECT * FROM ($query1) WHERE NOT EXISTS (SELECT b2 from 
($query2) WHERE a2 = " +
+      s"b2 AND a1 > b1)"
+    val result = tEnv.sqlQuery(query)
+
+    val sink = new TestingRetractSink
+    result.toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1,1", "2,3", "2,2")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testStreamNotInWithoutEqual(): Unit = {
+    val data1 = List(
+      (1, 1),
+      (1, 1),
+      (2, 2),
+      (2, 2),
+      (3, 3),
+      (3, 3),
+      (4, 4),
+      (4, 4),
+      (5, 5),
+      (5, 5))
+
+    val data2 = List(
+      (1, 1),
+      (2, 2),
+      (3, 3),
+      (4, 4),
+      (5, 5),
+      (6, 6),
+      (7, 7),
+      (8, 8),
+      (9, 9),
+      (10, 10))
+
+    val ds1 = failingDataSource(data1).toTable(tEnv, 'pk, 'a)
+    val ds2 = failingDataSource(data2).toTable(tEnv, 'pk, 'a)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+
+    val sql =
+      """
+        |SELECT pk FROM ds1 WHERE pk not in
+        |(SELECT pk FROM ds1 WHERE pk > 3)
+      """.stripMargin
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1", "1",
+      "2", "2",
+      "3", "3")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testStreamExistsWithoutEqual(): Unit = {
+    val data1 = List(
+      (10, "ACCOUNTING", "NEW YORK"),
+      (20, "RESEARCH", "DALLAS"),
+      (30, "SALES", "CHICAGO"),
+      (40, "OPERATIONS", "BOSTON"))
+
+    val data2 = List(
+      (7369, "SMITH", 20),
+      (7499, "ALLEN", 30),
+      (7566, "JONES", 20),
+      (7654, "MARTIN", 30))
+
+    val ds1 = failingDataSource(data1).toTable(tEnv, 'deptno, 'dname, 'loc)
+    val ds2 = failingDataSource(data2).toTable(tEnv, 'empno, 'ename, 'deptno)
+    tEnv.registerTable("scott_dept", ds1)
+    tEnv.registerTable("scott_emp", ds2)
+
+    val sql =
+      """
+        |select *
+        |from scott_dept as d
+        |where exists (select 1 from scott_emp where empno > d.deptno)
+        |and exists (select 0 from scott_emp where deptno = d.deptno and ename 
= 'SMITH')
+      """.stripMargin
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("20,RESEARCH,DALLAS")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
+  @Test
+  def testStreamNotExistsWithoutEqual(): Unit = {
+    val data1 = List(
+      (1, 1),
+      (1, 1),
+      (2, 2),
+      (2, 2),
+      (3, 3),
+      (3, 3),
+      (4, 4),
+      (4, 4),
+      (5, 5),
+      (5, 5))
+
+    val data2 = List(
+      (5, 5),
+      (6, 6),
+      (7, 7),
+      (8, 8),
+      (9, 9),
+      (10, 10))
+
+    val ds1 = failingDataSource(data1).toTable(tEnv, 'pk, 'a)
+    val ds2 = failingDataSource(data2).toTable(tEnv, 'pk, 'a)
+    tEnv.registerTable("ds1", ds1)
+    tEnv.registerTable("ds2", ds2)
+
+    val sql =
+      """
+        |SELECT pk FROM ds1 WHERE NOT EXISTS
+        |(SELECT 1 FROM ds2 WHERE ds2.pk < ds1.pk)
+      """.stripMargin
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1", "1",
+      "2", "2",
+      "3", "3",
+      "4", "4",
+      "5", "5")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+}
+
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java
index c0dcb6f..45715de 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.ArrayUtils;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Helper for null aware join.
@@ -33,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class NullAwareJoinHelper {
 
        public static int[] getNullFilterKeys(boolean[] filterNulls) {
-               checkArgument(filterNulls.length > 0);
+               checkNotNull(filterNulls);
                List<Integer> nullFilterKeyList = new ArrayList<>();
                for (int i = 0; i < filterNulls.length; i++) {
                        if (filterNulls[i]) {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
new file mode 100644
index 0000000..75ef1d7
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.stream;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BinaryRow;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.generated.JoinCondition;
+import org.apache.flink.table.runtime.join.NullAwareJoinHelper;
+import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.IterableIterator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract implementation for streaming unbounded Join operator which defines 
some member fields
+ * can be shared between different implementations.
+ */
+public abstract class AbstractStreamingJoinOperator extends 
AbstractStreamOperator<BaseRow>
+       implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
+
+       private static final long serialVersionUID = -376944622236540545L;
+
+       protected static final String LEFT_RECORDS_STATE_NAME = "left-records";
+       protected static final String RIGHT_RECORDS_STATE_NAME = 
"right-records";
+
+       private final GeneratedJoinCondition generatedJoinCondition;
+       protected final BaseRowTypeInfo leftType;
+       protected final BaseRowTypeInfo rightType;
+
+       protected final JoinInputSideSpec leftInputSideSpec;
+       protected final JoinInputSideSpec rightInputSideSpec;
+
+       /**
+        * Should filter null keys.
+        */
+       private final int[] nullFilterKeys;
+
+       /**
+        * No keys need to filter null.
+        */
+       private final boolean nullSafe;
+
+       /**
+        * Filter null to all keys.
+        */
+       private final boolean filterAllNulls;
+
+       protected final long minRetentionTime;
+       protected final boolean stateCleaningEnabled;
+
+       protected transient JoinCondition joinCondition;
+       protected transient TimestampedCollector<BaseRow> collector;
+
+       public AbstractStreamingJoinOperator(
+                       BaseRowTypeInfo leftType,
+                       BaseRowTypeInfo rightType,
+                       GeneratedJoinCondition generatedJoinCondition,
+                       JoinInputSideSpec leftInputSideSpec,
+                       JoinInputSideSpec rightInputSideSpec,
+                       boolean[] filterNullKeys,
+                       long minRetentionTime) {
+               this.leftType = leftType;
+               this.rightType = rightType;
+               this.generatedJoinCondition = generatedJoinCondition;
+               this.leftInputSideSpec = leftInputSideSpec;
+               this.rightInputSideSpec = rightInputSideSpec;
+               this.minRetentionTime = minRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+               this.nullFilterKeys = 
NullAwareJoinHelper.getNullFilterKeys(filterNullKeys);
+               this.nullSafe = nullFilterKeys.length == 0;
+               this.filterAllNulls = nullFilterKeys.length == 
filterNullKeys.length;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+
+               this.joinCondition = new JoinConditionWithNullFilters(
+                       
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()));
+
+               this.collector = new TimestampedCollector<>(output);
+       }
+
+       // 
----------------------------------------------------------------------------------------
+       // Utility Classes
+       // 
----------------------------------------------------------------------------------------
+
+       private class JoinConditionWithNullFilters implements JoinCondition {
+
+               final JoinCondition backingJoinCondition;
+
+               private JoinConditionWithNullFilters(JoinCondition 
backingJoinCondition) {
+                       this.backingJoinCondition = backingJoinCondition;
+               }
+
+               @Override
+               public boolean apply(BaseRow left, BaseRow right) {
+                       if (!nullSafe) { // is not null safe, return false if 
any null exists
+                               // key is always BinaryRow
+                               BinaryRow joinKey = (BinaryRow) getCurrentKey();
+                               if (filterAllNulls ? joinKey.anyNull() : 
joinKey.anyNull(nullFilterKeys)) {
+                                       // find null present, return false 
directly
+                                       return false;
+                               }
+                       }
+                       // test condition
+                       return backingJoinCondition.apply(left, right);
+               }
+       }
+
+       /**
+        * The {@link AssociatedRecords} is the records associated to the input 
row. It is a wrapper
+        * of {@code List<OuterRecord>} which provides two helpful methods 
{@link #getRecords()} and
+        * {@link #getOuterRecords()}. See the method Javadoc for more details.
+        */
+       protected static final class AssociatedRecords {
+               private final List<OuterRecord> records;
+
+               private AssociatedRecords(List<OuterRecord> records) {
+                       checkNotNull(records);
+                       this.records = records;
+               }
+
+               public boolean isEmpty() {
+                       return records.isEmpty();
+               }
+
+               public int size() {
+                       return records.size();
+               }
+
+               /**
+                * Gets the iterable of records. This is usually be called when 
the
+                * {@link AssociatedRecords} is from inner side.
+                */
+               public Iterable<BaseRow> getRecords() {
+                       return new RecordsIterable(records);
+               }
+
+               /**
+                * Gets the iterable of {@link OuterRecord} which composites 
record and numOfAssociations.
+                * This is usually be called when the {@link AssociatedRecords} 
is from outer side.
+                */
+               public Iterable<OuterRecord> getOuterRecords() {
+                       return records;
+               }
+
+               /**
+                * Creates an {@link AssociatedRecords} which represents the 
records associated to the
+                * input row.
+                */
+               public static AssociatedRecords of(
+                       BaseRow input,
+                       boolean inputIsLeft,
+                       JoinRecordStateView otherSideStateView,
+                       JoinCondition condition) throws Exception {
+                       List<OuterRecord> associations = new ArrayList<>();
+                       if (otherSideStateView instanceof 
OuterJoinRecordStateView) {
+                               OuterJoinRecordStateView outerStateView = 
(OuterJoinRecordStateView) otherSideStateView;
+                               Iterable<Tuple2<BaseRow, Integer>> records = 
outerStateView.getRecordsAndNumOfAssociations();
+                               for (Tuple2<BaseRow, Integer> record : records) 
{
+                                       boolean matched = inputIsLeft ? 
condition.apply(input, record.f0) : condition.apply(record.f0, input);
+                                       if (matched) {
+                                               associations.add(new 
OuterRecord(record.f0, record.f1));
+                                       }
+                               }
+                       } else {
+                               Iterable<BaseRow> records = 
otherSideStateView.getRecords();
+                               for (BaseRow record : records) {
+                                       boolean matched = inputIsLeft ? 
condition.apply(input, record) : condition.apply(record, input);
+                                       if (matched) {
+                                               // use -1 as the default number 
of associations
+                                               associations.add(new 
OuterRecord(record, -1));
+                                       }
+                               }
+                       }
+                       return new AssociatedRecords(associations);
+               }
+
+       }
+
+       /**
+        * A lazy Iterable which transform {@code List<OuterReocord>} to {@code 
Iterable<BaseRow>}.
+        */
+       private static final class RecordsIterable implements 
IterableIterator<BaseRow> {
+               private final List<OuterRecord> records;
+               private int index = 0;
+
+               private RecordsIterable(List<OuterRecord> records) {
+                       this.records = records;
+               }
+
+               @Override
+               public Iterator<BaseRow> iterator() {
+                       index = 0;
+                       return this;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return index < records.size();
+               }
+
+               @Override
+               public BaseRow next() {
+                       BaseRow row = records.get(index).record;
+                       index++;
+                       return row;
+               }
+       }
+
+       /**
+        * An {@link OuterRecord} is a composite of record and {@code 
numOfAssociations}. The
+        * {@code numOfAssociations} represents the number of associated 
records in the other side.
+        * It is used when the record is from outer side (e.g. left side in 
LEFT OUTER JOIN).
+        * When the {@code numOfAssociations} is ZERO, we need to send a null 
padding row.
+        * This is useful to avoid recompute the associated numbers every time.
+        *
+        * <p>When the record is from inner side (e.g. right side in LEFT OUTER 
JOIN), the
+        * {@code numOfAssociations} will always be {@code -1}.
+        */
+       protected static final class OuterRecord {
+               public final BaseRow record;
+               public final int numOfAssociations;
+
+               private OuterRecord(BaseRow record, int numOfAssociations) {
+                       this.record = record;
+                       this.numOfAssociations = numOfAssociations;
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java
index 849eeec..2cc9896 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java
@@ -18,74 +18,31 @@
 
 package org.apache.flink.table.runtime.join.stream;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.GenericRow;
 import org.apache.flink.table.dataformat.JoinedRow;
 import org.apache.flink.table.dataformat.util.BaseRowUtil;
 import org.apache.flink.table.generated.GeneratedJoinCondition;
-import org.apache.flink.table.generated.JoinCondition;
-import org.apache.flink.table.runtime.join.NullAwareJoinHelper;
 import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec;
 import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView;
 import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews;
 import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView;
 import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews;
 import org.apache.flink.table.typeutils.BaseRowTypeInfo;
-import org.apache.flink.util.IterableIterator;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Streaming unbounded Join operator which supports INNER/LEFT/RIGHT/FULL JOIN.
  */
-public class StreamingJoinOperator extends AbstractStreamOperator<BaseRow>
-       implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> {
+public class StreamingJoinOperator extends AbstractStreamingJoinOperator {
 
        private static final long serialVersionUID = -376944622236540545L;
 
-       private final BaseRowTypeInfo leftType;
-       private final BaseRowTypeInfo rightType;
-       private final GeneratedJoinCondition generatedJoinCondition;
-
-       private final JoinInputSideSpec leftInputSideSpec;
-       private final JoinInputSideSpec rightInputSideSpec;
-
        // whether left side is outer side, e.g. left is outer but right is not 
when LEFT OUTER JOIN
        private final boolean leftIsOuter;
        // whether right side is outer side, e.g. right is outer but left is 
not when RIGHT OUTER JOIN
        private final boolean rightIsOuter;
 
-       /**
-        * Should filter null keys.
-        */
-       private final int[] nullFilterKeys;
-
-       /**
-        * No keys need to filter null.
-        */
-       private final boolean nullSafe;
-
-       /**
-        * Filter null to all keys.
-        */
-       private final boolean filterAllNulls;
-
-       private final long minRetentionTime;
-       private final boolean stateCleaningEnabled;
-
-       private transient JoinCondition joinCondition;
-       private transient TimestampedCollector<BaseRow> collector;
-
        private transient JoinedRow outRow;
        private transient BaseRow leftNullRow;
        private transient BaseRow rightNullRow;
@@ -105,28 +62,15 @@ public class StreamingJoinOperator extends 
AbstractStreamOperator<BaseRow>
                        boolean rightIsOuter,
                        boolean[] filterNullKeys,
                        long minRetentionTime) {
-               this.leftType = leftType;
-               this.rightType = rightType;
-               this.generatedJoinCondition = generatedJoinCondition;
-               this.leftInputSideSpec = leftInputSideSpec;
-               this.rightInputSideSpec = rightInputSideSpec;
-               this.minRetentionTime = minRetentionTime;
-               this.stateCleaningEnabled = minRetentionTime > 1;
+               super(leftType, rightType, generatedJoinCondition, 
leftInputSideSpec, rightInputSideSpec, filterNullKeys, minRetentionTime);
                this.leftIsOuter = leftIsOuter;
                this.rightIsOuter = rightIsOuter;
-               this.nullFilterKeys = 
NullAwareJoinHelper.getNullFilterKeys(filterNullKeys);
-               this.nullSafe = nullFilterKeys.length == 0;
-               this.filterAllNulls = nullFilterKeys.length == 
filterNullKeys.length;
        }
 
        @Override
        public void open() throws Exception {
                super.open();
 
-               this.joinCondition = new JoinConditionWithNullFilters(
-                       
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()));
-
-               this.collector = new TimestampedCollector<>(output);
                this.outRow = new JoinedRow();
                this.leftNullRow = new GenericRow(leftType.getArity());
                this.rightNullRow = new GenericRow(rightType.getArity());
@@ -183,9 +127,9 @@ public class StreamingJoinOperator extends 
AbstractStreamOperator<BaseRow>
         * Process an input element and output incremental joined records, 
retraction messages will
         * be sent in some scenarios.
         *
-        * <p>Following is the pseudocode to describe the core logic of this 
method. The logic of this
-        * method is too complex, so we provide the pseudocode to help 
understand the logic. We should
-        * keep sync the following pseudocode with the real logic of the method.
+        * <p>Following is the pseudo code to describe the core logic of this 
method. The logic of this
+        * method is too complex, so we provide the pseudo code to help 
understand the logic. We should
+        * keep sync the following pseudo code with the real logic of the 
method.
         *
         * <pre>
         * if input record is accumulate
@@ -302,6 +246,7 @@ public class StreamingJoinOperator extends 
AbstractStreamOperator<BaseRow>
                        }
                } else { // input record is retract
                        // state.retract(record)
+                       input.setHeader(BaseRowUtil.ACCUMULATE_MSG);
                        inputSideStateView.retractRecord(input);
                        if (associatedRecords.isEmpty()) { // there is no 
matched rows on the other side
                                if (inputIsOuter) { // input side is outer
@@ -352,153 +297,4 @@ public class StreamingJoinOperator extends 
AbstractStreamOperator<BaseRow>
                }
                collector.collect(outRow);
        }
-
-       // 
----------------------------------------------------------------------------------------
-       // Utility Classes
-       // 
----------------------------------------------------------------------------------------
-
-       private class JoinConditionWithNullFilters implements JoinCondition {
-
-               final JoinCondition backingJoinCondition;
-
-               private JoinConditionWithNullFilters(JoinCondition 
backingJoinCondition) {
-                       this.backingJoinCondition = backingJoinCondition;
-               }
-
-               @Override
-               public boolean apply(BaseRow left, BaseRow right) {
-                       if (!nullSafe) { // is not null safe, return false if 
any null exists
-                               // key is always BinaryRow
-                               BinaryRow joinKey = (BinaryRow) getCurrentKey();
-                               if (filterAllNulls ? joinKey.anyNull() : 
joinKey.anyNull(nullFilterKeys)) {
-                                       // find null present, return false 
directly
-                                       return false;
-                               }
-                       }
-                       // test condition
-                       return backingJoinCondition.apply(left, right);
-               }
-       }
-
-       /**
-        * The {@link AssociatedRecords} is the records associated to the input 
row. It is a wrapper
-        * of {@code List<OuterRecord>} which provides two helpful methods 
{@link #getRecords()} and
-        * {@link #getOuterRecords()}. See the method Javadoc for more details.
-        */
-       private static final class AssociatedRecords {
-               private final List<OuterRecord> records;
-
-               private AssociatedRecords(List<OuterRecord> records) {
-                       checkNotNull(records);
-                       this.records = records;
-               }
-
-               public boolean isEmpty() {
-                       return records.isEmpty();
-               }
-
-               public int size() {
-                       return records.size();
-               }
-
-               /**
-                * Gets the iterable of records. This is usually be called when 
the
-                * {@link AssociatedRecords} is from inner side.
-                */
-               public Iterable<BaseRow> getRecords() {
-                       return new RecordsIterable(records);
-               }
-
-               /**
-                * Gets the iterable of {@link OuterRecord} which composites 
record and numOfAssociations.
-                * This is usually be called when the {@link AssociatedRecords} 
is from outer side.
-                */
-               public Iterable<OuterRecord> getOuterRecords() {
-                       return records;
-               }
-
-               /**
-                * Creates an {@link AssociatedRecords} which represents the 
records associated to the
-                * input row.
-                */
-               public static AssociatedRecords of(
-                               BaseRow input,
-                               boolean inputIsLeft,
-                               JoinRecordStateView otherSideStateView,
-                               JoinCondition condition) throws Exception {
-                       List<OuterRecord> associations = new ArrayList<>();
-                       if (otherSideStateView instanceof 
OuterJoinRecordStateView) {
-                               OuterJoinRecordStateView outerStateView = 
(OuterJoinRecordStateView) otherSideStateView;
-                               Iterable<Tuple2<BaseRow, Integer>> records = 
outerStateView.getRecordsAndNumOfAssociations();
-                               for (Tuple2<BaseRow, Integer> record : records) 
{
-                                       boolean matched = inputIsLeft ? 
condition.apply(input, record.f0) : condition.apply(record.f0, input);
-                                       if (matched) {
-                                               associations.add(new 
OuterRecord(record.f0, record.f1));
-                                       }
-                               }
-                       } else {
-                               Iterable<BaseRow> records = 
otherSideStateView.getRecords();
-                               for (BaseRow record : records) {
-                                       boolean matched = inputIsLeft ? 
condition.apply(input, record) : condition.apply(record, input);
-                                       if (matched) {
-                                               // use -1 as the default number 
of associations
-                                               associations.add(new 
OuterRecord(record, -1));
-                                       }
-                               }
-                       }
-                       return new AssociatedRecords(associations);
-               }
-
-       }
-
-       /**
-        * A lazy Iterable which transform {@code List<OuterReocord>} to {@code 
Iterable<BaseRow>}.
-        */
-       private static final class RecordsIterable implements 
IterableIterator<BaseRow> {
-               private final List<OuterRecord> records;
-               private int index = 0;
-
-               private RecordsIterable(List<OuterRecord> records) {
-                       this.records = records;
-               }
-
-               @Override
-               public Iterator<BaseRow> iterator() {
-                       index = 0;
-                       return this;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return index < records.size();
-               }
-
-               @Override
-               public BaseRow next() {
-                       BaseRow row = records.get(index).record;
-                       index++;
-                       return row;
-               }
-       }
-
-       /**
-        * An {@link OuterRecord} is a composite of record and {@code 
numOfAssociations}. The
-        * {@code numOfAssociations} represents the number of associated 
records in the other side.
-        * It is used when the record is from outer side (e.g. left side in 
LEFT OUTER JOIN).
-        * When the {@code numOfAssociations} is ZERO, we need to send a null 
padding row.
-        * This is useful to avoid recompute the associated numbers every time.
-        *
-        * <p>When the record is from inner side (e.g. right side in LEFT OUTER 
JOIN), the
-        * {@code numOfAssociations} will always be {@code -1}.
-        */
-       private static final class OuterRecord {
-               private final BaseRow record;
-               private final int numOfAssociations;
-
-               private OuterRecord(BaseRow record, int numOfAssociations) {
-                       this.record = record;
-                       this.numOfAssociations = numOfAssociations;
-               }
-       }
-
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingSemiAntiJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingSemiAntiJoinOperator.java
new file mode 100644
index 0000000..6a8a278
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingSemiAntiJoinOperator.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.stream;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView;
+import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView;
+import 
org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+/**
+ * Streaming unbounded Join operator which supports SEMI/ANTI JOIN.
+ */
+public class StreamingSemiAntiJoinOperator extends 
AbstractStreamingJoinOperator {
+
+       private static final long serialVersionUID = -3135772379944924519L;
+
+       // true if it is anti join, otherwise is semi joinp
+       private final boolean isAntiJoin;
+
+       // left join state
+       private transient OuterJoinRecordStateView leftRecordStateView;
+       // right join state
+       private transient JoinRecordStateView rightRecordStateView;
+
+       public StreamingSemiAntiJoinOperator(
+                       boolean isAntiJoin,
+                       BaseRowTypeInfo leftType,
+                       BaseRowTypeInfo rightType,
+                       GeneratedJoinCondition generatedJoinCondition,
+                       JoinInputSideSpec leftInputSideSpec,
+                       JoinInputSideSpec rightInputSideSpec,
+                       boolean[] filterNullKeys,
+                       long minRetentionTime) {
+               super(leftType, rightType, generatedJoinCondition, 
leftInputSideSpec, rightInputSideSpec, filterNullKeys, minRetentionTime);
+               this.isAntiJoin = isAntiJoin;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+
+               this.leftRecordStateView = OuterJoinRecordStateViews.create(
+                       getRuntimeContext(),
+                       LEFT_RECORDS_STATE_NAME,
+                       leftInputSideSpec,
+                       leftType,
+                       minRetentionTime,
+                       stateCleaningEnabled);
+
+               this.rightRecordStateView = JoinRecordStateViews.create(
+                       getRuntimeContext(),
+                       RIGHT_RECORDS_STATE_NAME,
+                       rightInputSideSpec,
+                       rightType,
+                       minRetentionTime,
+                       stateCleaningEnabled);
+       }
+
+       /**
+        * Process an input element and output incremental joined records, 
retraction messages will
+        * be sent in some scenarios.
+        *
+        * <p>Following is the pseudo code to describe the core logic of this 
method.
+        *
+        * <pre>
+        * if there is no matched rows on the other side
+        *   if anti join, send input record
+        * if there are matched rows on the other side
+        *   if semi join, send input record
+        * if the input record is accumulate, state.add(record, matched size)
+        * if the input record is retract, state.retract(record)
+        * </pre>
+        */
+       @Override
+       public void processElement1(StreamRecord<BaseRow> element) throws 
Exception {
+               BaseRow input = element.getValue();
+               AssociatedRecords associatedRecords = 
AssociatedRecords.of(input, true, rightRecordStateView, joinCondition);
+               if (associatedRecords.isEmpty()) {
+                       if (isAntiJoin) {
+                               collector.collect(input);
+                       }
+               } else { // there are matched rows on the other side
+                       if (!isAntiJoin) {
+                               collector.collect(input);
+                       }
+               }
+               if (BaseRowUtil.isAccumulateMsg(input)) {
+                       leftRecordStateView.addRecord(input, 
associatedRecords.size());
+               } else { // input is retract
+                       input.setHeader(BaseRowUtil.ACCUMULATE_MSG);
+                       leftRecordStateView.retractRecord(input);
+               }
+       }
+
+       /**
+        * Process an input element and output incremental joined records, 
retraction messages will
+        * be sent in some scenarios.
+        *
+        * <p>Following is the pseudo code to describe the core logic of this 
method.
+        *
+        * <pre>
+        * if input record is accumulate
+        * | state.add(record)
+        * | if there is no matched rows on the other side, skip
+        * | if there are matched rows on the other side
+        * | | if the matched num in the matched rows == 0
+        * | |   if anti join, send -[other]s
+        * | |   if semi join, send +[other]s
+        * | | if the matched num in the matched rows > 0, skip
+        * | | otherState.update(other, old+1)
+        * | endif
+        * endif
+        * if input record is retract
+        * | state.retract(record)
+        * | if there is no matched rows on the other side, skip
+        * | if there are matched rows on the other side
+        * | | if the matched num in the matched rows == 0, this should never 
happen!
+        * | | if the matched num in the matched rows == 1
+        * | |   if semi join, send -[other]
+        * | |   if anti join, send +[other]
+        * | | if the matched num in the matched rows > 1, skip
+        * | | otherState.update(other, old-1)
+        * | endif
+        * endif
+        * </pre>
+        */
+       @Override
+       public void processElement2(StreamRecord<BaseRow> element) throws 
Exception {
+               BaseRow input = element.getValue();
+               AssociatedRecords associatedRecords = 
AssociatedRecords.of(input, false, leftRecordStateView, joinCondition);
+               if (BaseRowUtil.isAccumulateMsg(input)) {
+                       rightRecordStateView.addRecord(input);
+                       if (!associatedRecords.isEmpty()) {
+                               // there are matched rows on the other side
+                               for (OuterRecord outerRecord : 
associatedRecords.getOuterRecords()) {
+                                       BaseRow other = outerRecord.record;
+                                       if (outerRecord.numOfAssociations == 0) 
{
+                                               if (isAntiJoin) {
+                                                       // send -[other]
+                                                       
other.setHeader(BaseRowUtil.RETRACT_MSG);
+                                                       
collector.collect(other);
+                                                       // set header back to 
ACCUMULATE_MSG, because we will update the other row to state
+                                                       
other.setHeader(BaseRowUtil.ACCUMULATE_MSG);
+                                               } else {
+                                                       // send +[other]
+                                                       // the header of other 
is ACCUMULATE_MSG
+                                                       
collector.collect(other);
+                                               }
+                                       } // ignore when number > 0
+                                       
leftRecordStateView.updateNumOfAssociations(other, 
outerRecord.numOfAssociations + 1);
+                               }
+                       } // ignore when associated number == 0
+               } else { // retract input
+                       input.setHeader(BaseRowUtil.ACCUMULATE_MSG);
+                       rightRecordStateView.retractRecord(input);
+                       if (!associatedRecords.isEmpty()) {
+                               // there are matched rows on the other side
+                               for (OuterRecord outerRecord : 
associatedRecords.getOuterRecords()) {
+                                       BaseRow other = outerRecord.record;
+                                       if (outerRecord.numOfAssociations == 1) 
{
+                                               if (!isAntiJoin) {
+                                                       // send -[other]
+                                                       
other.setHeader(BaseRowUtil.RETRACT_MSG);
+                                                       
collector.collect(other);
+                                                       // set header back to 
ACCUMULATE_MSG, because we will update the other row to state
+                                                       
other.setHeader(BaseRowUtil.ACCUMULATE_MSG);
+                                               } else {
+                                                       // send +[other]
+                                                       
collector.collect(other);
+                                               }
+                                       } // ignore when number > 0
+                                       
leftRecordStateView.updateNumOfAssociations(other, 
outerRecord.numOfAssociations - 1);
+                               }
+                       } // ignore when associated number == 0
+               }
+       }
+}

Reply via email to