This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 16818da [FLINK-12743][table-runtime-blink] Introduce unbounded streaming anti/semi join operator 16818da is described below commit 16818dad7e04fac8f862771a23a702f72e605e72 Author: Jark Wu <imj...@gmail.com> AuthorDate: Wed Jun 19 20:04:08 2019 +0800 [FLINK-12743][table-runtime-blink] Introduce unbounded streaming anti/semi join operator This closes #8792 --- .../nodes/physical/stream/StreamExecJoin.scala | 44 +- .../runtime/stream/sql/Limit0RemoveITCase.scala | 6 +- .../stream/sql/SemiAntiJoinStreamITCase.scala | 541 +++++++++++++++++++++ .../table/runtime/join/NullAwareJoinHelper.java | 4 +- .../join/stream/AbstractStreamingJoinOperator.java | 259 ++++++++++ .../runtime/join/stream/StreamingJoinOperator.java | 216 +------- .../join/stream/StreamingSemiAntiJoinOperator.java | 199 ++++++++ 7 files changed, 1035 insertions(+), 234 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala index 015e0ea..a7fb264 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoin.scala @@ -25,14 +25,14 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rex.RexNode import org.apache.flink.streaming.api.transformations.{StreamTransformation, TwoInputTransformation} -import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode} import org.apache.flink.table.plan.util.{JoinUtil, KeySelectorUtil, RelExplainUtil} import org.apache.flink.table.runtime.join.FlinkJoinType -import org.apache.flink.table.runtime.join.stream.StreamingJoinOperator +import org.apache.flink.table.runtime.join.stream.{StreamingJoinOperator, StreamingSemiAntiJoinOperator} import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec import org.apache.flink.table.typeutils.BaseRowTypeInfo @@ -162,24 +162,32 @@ class StreamExecJoin( leftType.toRowType, rightType.toRowType) - if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) { - throw new TableException("SEMI/ANTI Join is not supported yet.") - } - - val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL - val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL val minRetentionTime = tableConfig.getMinIdleStateRetentionTime - val operator = new StreamingJoinOperator( - leftType, - rightType, - generatedCondition, - leftInputSpec, - rightInputSpec, - leftIsOuter, - rightIsOuter, - filterNulls, - minRetentionTime) + val operator = if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) { + new StreamingSemiAntiJoinOperator( + joinType == JoinRelType.ANTI, + leftType, + rightType, + generatedCondition, + leftInputSpec, + rightInputSpec, + filterNulls, + minRetentionTime) + } else { + val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL + val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL + new StreamingJoinOperator( + leftType, + rightType, + generatedCondition, + leftInputSpec, + rightInputSpec, + leftIsOuter, + rightIsOuter, + filterNulls, + minRetentionTime) + } val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow]( leftTransform, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala index 6a402d5..6bb8aea 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/Limit0RemoveITCase.scala @@ -122,8 +122,7 @@ class Limit0RemoveITCase extends StreamingTestBase() { assertEquals(expected, sink.getAppendResults.sorted) } - @Test(expected = classOf[TableException]) - // TODO remove exception after translateToPlanInternal is implemented in StreamExecJoin + @Test def testLimitRemoveWithExists(): Unit = { val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6)) val table1 = ds1.toTable(tEnv, 'a) @@ -143,8 +142,7 @@ class Limit0RemoveITCase extends StreamingTestBase() { assertEquals(0, sink.getRawResults.size) } - @Test(expected = classOf[TableException]) - // TODO remove exception after translateToPlanInternal is implemented in StreamExecJoin + @Test def testLimitRemoveWithNotExists(): Unit = { val ds1 = env.fromCollection(Seq(1, 2, 3, 4, 5, 6)) val table1 = ds1.toTable(tEnv, 'a) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SemiAntiJoinStreamITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SemiAntiJoinStreamITCase.scala new file mode 100644 index 0000000..f9407bb --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/SemiAntiJoinStreamITCase.scala @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode +import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestData, TestingRetractSink} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.Seq + +@RunWith(classOf[Parameterized]) +class SemiAntiJoinStreamITCase(state: StateBackendMode) + extends StreamingWithStateTestBase(state) { + + override def before(): Unit = { + super.before() + val tableA = failingDataSource(TestData.smallTupleData3) + .toTable(tEnv, 'a1, 'a2, 'a3) + val tableB = failingDataSource(TestData.tupleData5) + .toTable(tEnv, 'b1, 'b2, 'b3, 'b4, 'b5) + tEnv.registerTable("A", tableA) + tEnv.registerTable("B", tableB) + } + + val data = List( + (1, 1L, 0, "Hallo", 1L), + (2, 2L, 1, "Hallo Welt", 2L), + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L) + ) + + val data2 = List( + (1, 1L, "Hi"), + (2, 2L, "Hello"), + (3, 2L, "Hello world") + ) + + val dataCannotBeJoin = List( + (2, 3L, 2, "Hallo Welt wie", 1L), + (3, 4L, 3, "Hallo Welt wie gehts?", 2L), + (3, 5L, 4, "ABC", 2L), + (3, 6L, 5, "BCD", 3L) + ) + + @Test + def testGenericSemiJoin(): Unit = { + val ds1 = failingDataSource(data2).toTable(tEnv, 'a, 'b, 'c) + val ds2 = failingDataSource(data).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + val query = "SELECT a, b, c FROM ds1 WHERE a in (SELECT d from ds2 WHERE d < 3)" + + val sink = new TestingRetractSink + tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1,1,Hi", "2,2,Hello") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoinWithOneSideRetraction(): Unit = { + val leftTable = List( + (1, "a"), + (2, "b"), + (10, "c"), + (6, "d"), + (8, "e") + ) + + val rightTable = List( + (0, "a"), + (1, "a"), + (1, "b"), + (1, "b"), + (1, "c"), + (2, "c"), + (3, "c"), + (4, "c"), + (1, "d"), + (2, "d"), + (3, "d"), + (4, "e"), + (4, "e") + ) + + val ds1 = failingDataSource(leftTable).toTable(tEnv, 'a, 'b) + val ds2 = failingDataSource(rightTable).toTable(tEnv, 'c, 'd) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + val query = "SELECT a FROM ds1 WHERE a in (SELECT sum(c) from ds2 GROUP BY d)" + + val sink = new TestingRetractSink + tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + val expected = Seq("1", "2", "10", "6", "8") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoinWithRetractTwoSidesRetraction(): Unit = { + + val tableData = List( + (0, "a"), + (1, "a"), + (1, "b"), + (1, "b"), + (1, "c"), + (2, "c"), + (3, "c"), + (4, "c"), + (1, "d"), + (2, "d"), + (3, "d"), + (3, "e"), + (5, "e") + ) + val ds1 = failingDataSource(tableData).toTable(tEnv, 'a, 'b) + val ds2 = failingDataSource(tableData).toTable(tEnv, 'c, 'd) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + val ds3 = tEnv.sqlQuery("SELECT sum(a) as a FROM ds1 GROUP BY b") + tEnv.registerTable("ds3", ds3) + val query = "SELECT a FROM ds3 WHERE a in (SELECT sum(c) from ds2 GROUP BY d)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1", "2", "10", "6", "8") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testGenericAntiJoin(): Unit = { + val ds1 = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + val ds2 = failingDataSource(data2).toTable(tEnv, 'f, 'g, 'h) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + val query = "SELECT c FROM ds1 WHERE NOT EXISTS (SELECT * from ds2 WHERE b = g)" + + val sink = new TestingRetractSink + tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + val expected = Seq("2", "3", "4", "5") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testAntiJoinWithOneSideRetraction(): Unit = { + val leftTable = List( + (1, "a"), + (2, "b"), + (10, "c"), + (6, "d"), + (8, "e"), + (11, "f") + ) + + val rightTable = List( + (0, "a"), + (1, "a"), + (1, "b"), + (1, "b"), + (1, "c"), + (2, "c"), + (3, "c"), + (4, "c"), + (1, "d"), + (2, "d"), + (3, "d"), + (4, "e"), + (4, "e") + ) + + val ds1 = failingDataSource(leftTable).toTable(tEnv, 'a, 'b) + val ds2 = failingDataSource(rightTable).toTable(tEnv, 'c, 'd) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + val ds3 = tEnv.sqlQuery("SELECT SUM(c) as c FROM ds2 GROUP BY d") + tEnv.registerTable("ds3", ds3) + val query = "SELECT * FROM ds1 WHERE NOT EXISTS (SELECT c from ds3 WHERE a = c)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + val expected = Seq("11,f") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testAntiJoinWithTwoSidesRetraction(): Unit = { + val leftTable = List( + (0, "a"), + (5, "f"), + (-2, "a"), + (1, "b"), + (1, "b"), + (1, "c"), + (2, "c"), + (3, "c"), + (1, "f"), + (4, "c"), + (1, "d"), + (2, "d"), + (3, "d"), + (4, "e"), + (3, "a"), + (3, "e"), + (2, "f") + ) + + val rightTable = List( + (0, "a"), + (1, "a"), + (1, "b"), + (1, "b"), + (1, "c"), + (2, "c"), + (3, "c"), + (4, "c"), + (1, "d"), + (2, "d"), + (3, "d"), + (4, "e"), + (3, "e") + ) + + val ds1 = failingDataSource(leftTable).toTable(tEnv, 'a, 'b) + val ds2 = failingDataSource(rightTable).toTable(tEnv, 'c, 'd) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + val ds3 = tEnv.sqlQuery("SELECT SUM(c) as c FROM ds2 GROUP BY d") + tEnv.registerTable("ds3", ds3) + val ds4 = tEnv.sqlQuery("SELECT SUM(a) as a, b FROM ds1 GROUP BY b") + tEnv.registerTable("ds4", ds4) + val query = "SELECT * FROM ds4 WHERE NOT EXISTS (SELECT c from ds3 WHERE a = c)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + val expected = Seq("8,f") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoin(): Unit = { + val query = "SELECT * FROM A WHERE a1 in (SELECT b1 from B)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoinNonEqui(): Unit = { + val query = "SELECT * FROM A WHERE a1 in (SELECT b1 from B WHERE a2 < b2)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("2,2,Hello", "3,2,Hello world") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoinWithEqualPkNonEqui(): Unit = { + val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1" + val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1" + val query = s"SELECT * FROM ($query1) WHERE a1 in (SELECT b1 from ($query2) WHERE a2 < b2)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("2,3", "2,2") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoinWithRightNotPkNonEqui(): Unit = { + val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1" + val query = s"SELECT * FROM ($query1) WHERE a1 in (SELECT b1 from B WHERE a2 < b2)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("2,2", "2,3") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testSemiJoinWithPkNonEqui(): Unit = { + val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1" + val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1" + val query = s"SELECT * FROM ($query1) WHERE a2 in (SELECT b2 from ($query2) WHERE a1 > b1)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + assertEquals(0, sink.getRetractResults.size) + } + + @Test + def testAntiJoin(): Unit = { + val query = "SELECT * FROM A WHERE NOT EXISTS (SELECT b1 from B WHERE a1 = b1)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + assertEquals(0, sink.getRetractResults.size) + } + + @Test + def testAntiJoinNonEqui(): Unit = { + val query = "SELECT * FROM A WHERE NOT EXISTS (SELECT b1 from B WHERE a1 = b1 AND a2 < b2)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1,1,Hi") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testAntiJoinWithEqualPkNonEqui(): Unit = { + val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1" + val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1" + val query = s"SELECT * FROM ($query1) WHERE NOT EXISTS (SELECT b1 from ($query2) WHERE a1 = " + + s"b1 AND a2 < b2)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1,1") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testAntiJoinWithRightNotPkNonEqui(): Unit = { + val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1" + val query = s"SELECT * FROM ($query1) WHERE NOT EXISTS (SELECT b1 from B WHERE a1 = b1 AND a2" + + s" > b2)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("2,2", "1,1", "2,3") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testAntiJoinWithPkNonEqui(): Unit = { + val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1" + val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1" + val query = s"SELECT * FROM ($query1) WHERE NOT EXISTS (SELECT b2 from ($query2) WHERE a2 = " + + s"b2 AND a1 > b1)" + val result = tEnv.sqlQuery(query) + + val sink = new TestingRetractSink + result.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1,1", "2,3", "2,2") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testStreamNotInWithoutEqual(): Unit = { + val data1 = List( + (1, 1), + (1, 1), + (2, 2), + (2, 2), + (3, 3), + (3, 3), + (4, 4), + (4, 4), + (5, 5), + (5, 5)) + + val data2 = List( + (1, 1), + (2, 2), + (3, 3), + (4, 4), + (5, 5), + (6, 6), + (7, 7), + (8, 8), + (9, 9), + (10, 10)) + + val ds1 = failingDataSource(data1).toTable(tEnv, 'pk, 'a) + val ds2 = failingDataSource(data2).toTable(tEnv, 'pk, 'a) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + + val sql = + """ + |SELECT pk FROM ds1 WHERE pk not in + |(SELECT pk FROM ds1 WHERE pk > 3) + """.stripMargin + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1", "1", + "2", "2", + "3", "3") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testStreamExistsWithoutEqual(): Unit = { + val data1 = List( + (10, "ACCOUNTING", "NEW YORK"), + (20, "RESEARCH", "DALLAS"), + (30, "SALES", "CHICAGO"), + (40, "OPERATIONS", "BOSTON")) + + val data2 = List( + (7369, "SMITH", 20), + (7499, "ALLEN", 30), + (7566, "JONES", 20), + (7654, "MARTIN", 30)) + + val ds1 = failingDataSource(data1).toTable(tEnv, 'deptno, 'dname, 'loc) + val ds2 = failingDataSource(data2).toTable(tEnv, 'empno, 'ename, 'deptno) + tEnv.registerTable("scott_dept", ds1) + tEnv.registerTable("scott_emp", ds2) + + val sql = + """ + |select * + |from scott_dept as d + |where exists (select 1 from scott_emp where empno > d.deptno) + |and exists (select 0 from scott_emp where deptno = d.deptno and ename = 'SMITH') + """.stripMargin + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("20,RESEARCH,DALLAS") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } + + @Test + def testStreamNotExistsWithoutEqual(): Unit = { + val data1 = List( + (1, 1), + (1, 1), + (2, 2), + (2, 2), + (3, 3), + (3, 3), + (4, 4), + (4, 4), + (5, 5), + (5, 5)) + + val data2 = List( + (5, 5), + (6, 6), + (7, 7), + (8, 8), + (9, 9), + (10, 10)) + + val ds1 = failingDataSource(data1).toTable(tEnv, 'pk, 'a) + val ds2 = failingDataSource(data2).toTable(tEnv, 'pk, 'a) + tEnv.registerTable("ds1", ds1) + tEnv.registerTable("ds2", ds2) + + val sql = + """ + |SELECT pk FROM ds1 WHERE NOT EXISTS + |(SELECT 1 FROM ds2 WHERE ds2.pk < ds1.pk) + """.stripMargin + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = Seq("1", "1", + "2", "2", + "3", "3", + "4", "4", + "5", "5") + assertEquals(expected.sorted, sink.getRetractResults.sorted) + } +} + diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java index c0dcb6f..45715de 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/NullAwareJoinHelper.java @@ -25,7 +25,7 @@ import org.apache.commons.lang3.ArrayUtils; import java.util.ArrayList; import java.util.List; -import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Helper for null aware join. @@ -33,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class NullAwareJoinHelper { public static int[] getNullFilterKeys(boolean[] filterNulls) { - checkArgument(filterNulls.length > 0); + checkNotNull(filterNulls); List<Integer> nullFilterKeyList = new ArrayList<>(); for (int i = 0; i < filterNulls.length; i++) { if (filterNulls[i]) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java new file mode 100644 index 0000000..75ef1d7 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/AbstractStreamingJoinOperator.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.stream; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.generated.JoinCondition; +import org.apache.flink.table.runtime.join.NullAwareJoinHelper; +import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; +import org.apache.flink.util.IterableIterator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract implementation for streaming unbounded Join operator which defines some member fields + * can be shared between different implementations. + */ +public abstract class AbstractStreamingJoinOperator extends AbstractStreamOperator<BaseRow> + implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> { + + private static final long serialVersionUID = -376944622236540545L; + + protected static final String LEFT_RECORDS_STATE_NAME = "left-records"; + protected static final String RIGHT_RECORDS_STATE_NAME = "right-records"; + + private final GeneratedJoinCondition generatedJoinCondition; + protected final BaseRowTypeInfo leftType; + protected final BaseRowTypeInfo rightType; + + protected final JoinInputSideSpec leftInputSideSpec; + protected final JoinInputSideSpec rightInputSideSpec; + + /** + * Should filter null keys. + */ + private final int[] nullFilterKeys; + + /** + * No keys need to filter null. + */ + private final boolean nullSafe; + + /** + * Filter null to all keys. + */ + private final boolean filterAllNulls; + + protected final long minRetentionTime; + protected final boolean stateCleaningEnabled; + + protected transient JoinCondition joinCondition; + protected transient TimestampedCollector<BaseRow> collector; + + public AbstractStreamingJoinOperator( + BaseRowTypeInfo leftType, + BaseRowTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + JoinInputSideSpec leftInputSideSpec, + JoinInputSideSpec rightInputSideSpec, + boolean[] filterNullKeys, + long minRetentionTime) { + this.leftType = leftType; + this.rightType = rightType; + this.generatedJoinCondition = generatedJoinCondition; + this.leftInputSideSpec = leftInputSideSpec; + this.rightInputSideSpec = rightInputSideSpec; + this.minRetentionTime = minRetentionTime; + this.stateCleaningEnabled = minRetentionTime > 1; + this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys); + this.nullSafe = nullFilterKeys.length == 0; + this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length; + } + + @Override + public void open() throws Exception { + super.open(); + + this.joinCondition = new JoinConditionWithNullFilters( + generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader())); + + this.collector = new TimestampedCollector<>(output); + } + + // ---------------------------------------------------------------------------------------- + // Utility Classes + // ---------------------------------------------------------------------------------------- + + private class JoinConditionWithNullFilters implements JoinCondition { + + final JoinCondition backingJoinCondition; + + private JoinConditionWithNullFilters(JoinCondition backingJoinCondition) { + this.backingJoinCondition = backingJoinCondition; + } + + @Override + public boolean apply(BaseRow left, BaseRow right) { + if (!nullSafe) { // is not null safe, return false if any null exists + // key is always BinaryRow + BinaryRow joinKey = (BinaryRow) getCurrentKey(); + if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) { + // find null present, return false directly + return false; + } + } + // test condition + return backingJoinCondition.apply(left, right); + } + } + + /** + * The {@link AssociatedRecords} is the records associated to the input row. It is a wrapper + * of {@code List<OuterRecord>} which provides two helpful methods {@link #getRecords()} and + * {@link #getOuterRecords()}. See the method Javadoc for more details. + */ + protected static final class AssociatedRecords { + private final List<OuterRecord> records; + + private AssociatedRecords(List<OuterRecord> records) { + checkNotNull(records); + this.records = records; + } + + public boolean isEmpty() { + return records.isEmpty(); + } + + public int size() { + return records.size(); + } + + /** + * Gets the iterable of records. This is usually be called when the + * {@link AssociatedRecords} is from inner side. + */ + public Iterable<BaseRow> getRecords() { + return new RecordsIterable(records); + } + + /** + * Gets the iterable of {@link OuterRecord} which composites record and numOfAssociations. + * This is usually be called when the {@link AssociatedRecords} is from outer side. + */ + public Iterable<OuterRecord> getOuterRecords() { + return records; + } + + /** + * Creates an {@link AssociatedRecords} which represents the records associated to the + * input row. + */ + public static AssociatedRecords of( + BaseRow input, + boolean inputIsLeft, + JoinRecordStateView otherSideStateView, + JoinCondition condition) throws Exception { + List<OuterRecord> associations = new ArrayList<>(); + if (otherSideStateView instanceof OuterJoinRecordStateView) { + OuterJoinRecordStateView outerStateView = (OuterJoinRecordStateView) otherSideStateView; + Iterable<Tuple2<BaseRow, Integer>> records = outerStateView.getRecordsAndNumOfAssociations(); + for (Tuple2<BaseRow, Integer> record : records) { + boolean matched = inputIsLeft ? condition.apply(input, record.f0) : condition.apply(record.f0, input); + if (matched) { + associations.add(new OuterRecord(record.f0, record.f1)); + } + } + } else { + Iterable<BaseRow> records = otherSideStateView.getRecords(); + for (BaseRow record : records) { + boolean matched = inputIsLeft ? condition.apply(input, record) : condition.apply(record, input); + if (matched) { + // use -1 as the default number of associations + associations.add(new OuterRecord(record, -1)); + } + } + } + return new AssociatedRecords(associations); + } + + } + + /** + * A lazy Iterable which transform {@code List<OuterReocord>} to {@code Iterable<BaseRow>}. + */ + private static final class RecordsIterable implements IterableIterator<BaseRow> { + private final List<OuterRecord> records; + private int index = 0; + + private RecordsIterable(List<OuterRecord> records) { + this.records = records; + } + + @Override + public Iterator<BaseRow> iterator() { + index = 0; + return this; + } + + @Override + public boolean hasNext() { + return index < records.size(); + } + + @Override + public BaseRow next() { + BaseRow row = records.get(index).record; + index++; + return row; + } + } + + /** + * An {@link OuterRecord} is a composite of record and {@code numOfAssociations}. The + * {@code numOfAssociations} represents the number of associated records in the other side. + * It is used when the record is from outer side (e.g. left side in LEFT OUTER JOIN). + * When the {@code numOfAssociations} is ZERO, we need to send a null padding row. + * This is useful to avoid recompute the associated numbers every time. + * + * <p>When the record is from inner side (e.g. right side in LEFT OUTER JOIN), the + * {@code numOfAssociations} will always be {@code -1}. + */ + protected static final class OuterRecord { + public final BaseRow record; + public final int numOfAssociations; + + private OuterRecord(BaseRow record, int numOfAssociations) { + this.record = record; + this.numOfAssociations = numOfAssociations; + } + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java index 849eeec..2cc9896 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingJoinOperator.java @@ -18,74 +18,31 @@ package org.apache.flink.table.runtime.join.stream; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.table.dataformat.BinaryRow; import org.apache.flink.table.dataformat.GenericRow; import org.apache.flink.table.dataformat.JoinedRow; import org.apache.flink.table.dataformat.util.BaseRowUtil; import org.apache.flink.table.generated.GeneratedJoinCondition; -import org.apache.flink.table.generated.JoinCondition; -import org.apache.flink.table.runtime.join.NullAwareJoinHelper; import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec; import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView; import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews; import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView; import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews; import org.apache.flink.table.typeutils.BaseRowTypeInfo; -import org.apache.flink.util.IterableIterator; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * Streaming unbounded Join operator which supports INNER/LEFT/RIGHT/FULL JOIN. */ -public class StreamingJoinOperator extends AbstractStreamOperator<BaseRow> - implements TwoInputStreamOperator<BaseRow, BaseRow, BaseRow> { +public class StreamingJoinOperator extends AbstractStreamingJoinOperator { private static final long serialVersionUID = -376944622236540545L; - private final BaseRowTypeInfo leftType; - private final BaseRowTypeInfo rightType; - private final GeneratedJoinCondition generatedJoinCondition; - - private final JoinInputSideSpec leftInputSideSpec; - private final JoinInputSideSpec rightInputSideSpec; - // whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN private final boolean leftIsOuter; // whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN private final boolean rightIsOuter; - /** - * Should filter null keys. - */ - private final int[] nullFilterKeys; - - /** - * No keys need to filter null. - */ - private final boolean nullSafe; - - /** - * Filter null to all keys. - */ - private final boolean filterAllNulls; - - private final long minRetentionTime; - private final boolean stateCleaningEnabled; - - private transient JoinCondition joinCondition; - private transient TimestampedCollector<BaseRow> collector; - private transient JoinedRow outRow; private transient BaseRow leftNullRow; private transient BaseRow rightNullRow; @@ -105,28 +62,15 @@ public class StreamingJoinOperator extends AbstractStreamOperator<BaseRow> boolean rightIsOuter, boolean[] filterNullKeys, long minRetentionTime) { - this.leftType = leftType; - this.rightType = rightType; - this.generatedJoinCondition = generatedJoinCondition; - this.leftInputSideSpec = leftInputSideSpec; - this.rightInputSideSpec = rightInputSideSpec; - this.minRetentionTime = minRetentionTime; - this.stateCleaningEnabled = minRetentionTime > 1; + super(leftType, rightType, generatedJoinCondition, leftInputSideSpec, rightInputSideSpec, filterNullKeys, minRetentionTime); this.leftIsOuter = leftIsOuter; this.rightIsOuter = rightIsOuter; - this.nullFilterKeys = NullAwareJoinHelper.getNullFilterKeys(filterNullKeys); - this.nullSafe = nullFilterKeys.length == 0; - this.filterAllNulls = nullFilterKeys.length == filterNullKeys.length; } @Override public void open() throws Exception { super.open(); - this.joinCondition = new JoinConditionWithNullFilters( - generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader())); - - this.collector = new TimestampedCollector<>(output); this.outRow = new JoinedRow(); this.leftNullRow = new GenericRow(leftType.getArity()); this.rightNullRow = new GenericRow(rightType.getArity()); @@ -183,9 +127,9 @@ public class StreamingJoinOperator extends AbstractStreamOperator<BaseRow> * Process an input element and output incremental joined records, retraction messages will * be sent in some scenarios. * - * <p>Following is the pseudocode to describe the core logic of this method. The logic of this - * method is too complex, so we provide the pseudocode to help understand the logic. We should - * keep sync the following pseudocode with the real logic of the method. + * <p>Following is the pseudo code to describe the core logic of this method. The logic of this + * method is too complex, so we provide the pseudo code to help understand the logic. We should + * keep sync the following pseudo code with the real logic of the method. * * <pre> * if input record is accumulate @@ -302,6 +246,7 @@ public class StreamingJoinOperator extends AbstractStreamOperator<BaseRow> } } else { // input record is retract // state.retract(record) + input.setHeader(BaseRowUtil.ACCUMULATE_MSG); inputSideStateView.retractRecord(input); if (associatedRecords.isEmpty()) { // there is no matched rows on the other side if (inputIsOuter) { // input side is outer @@ -352,153 +297,4 @@ public class StreamingJoinOperator extends AbstractStreamOperator<BaseRow> } collector.collect(outRow); } - - // ---------------------------------------------------------------------------------------- - // Utility Classes - // ---------------------------------------------------------------------------------------- - - private class JoinConditionWithNullFilters implements JoinCondition { - - final JoinCondition backingJoinCondition; - - private JoinConditionWithNullFilters(JoinCondition backingJoinCondition) { - this.backingJoinCondition = backingJoinCondition; - } - - @Override - public boolean apply(BaseRow left, BaseRow right) { - if (!nullSafe) { // is not null safe, return false if any null exists - // key is always BinaryRow - BinaryRow joinKey = (BinaryRow) getCurrentKey(); - if (filterAllNulls ? joinKey.anyNull() : joinKey.anyNull(nullFilterKeys)) { - // find null present, return false directly - return false; - } - } - // test condition - return backingJoinCondition.apply(left, right); - } - } - - /** - * The {@link AssociatedRecords} is the records associated to the input row. It is a wrapper - * of {@code List<OuterRecord>} which provides two helpful methods {@link #getRecords()} and - * {@link #getOuterRecords()}. See the method Javadoc for more details. - */ - private static final class AssociatedRecords { - private final List<OuterRecord> records; - - private AssociatedRecords(List<OuterRecord> records) { - checkNotNull(records); - this.records = records; - } - - public boolean isEmpty() { - return records.isEmpty(); - } - - public int size() { - return records.size(); - } - - /** - * Gets the iterable of records. This is usually be called when the - * {@link AssociatedRecords} is from inner side. - */ - public Iterable<BaseRow> getRecords() { - return new RecordsIterable(records); - } - - /** - * Gets the iterable of {@link OuterRecord} which composites record and numOfAssociations. - * This is usually be called when the {@link AssociatedRecords} is from outer side. - */ - public Iterable<OuterRecord> getOuterRecords() { - return records; - } - - /** - * Creates an {@link AssociatedRecords} which represents the records associated to the - * input row. - */ - public static AssociatedRecords of( - BaseRow input, - boolean inputIsLeft, - JoinRecordStateView otherSideStateView, - JoinCondition condition) throws Exception { - List<OuterRecord> associations = new ArrayList<>(); - if (otherSideStateView instanceof OuterJoinRecordStateView) { - OuterJoinRecordStateView outerStateView = (OuterJoinRecordStateView) otherSideStateView; - Iterable<Tuple2<BaseRow, Integer>> records = outerStateView.getRecordsAndNumOfAssociations(); - for (Tuple2<BaseRow, Integer> record : records) { - boolean matched = inputIsLeft ? condition.apply(input, record.f0) : condition.apply(record.f0, input); - if (matched) { - associations.add(new OuterRecord(record.f0, record.f1)); - } - } - } else { - Iterable<BaseRow> records = otherSideStateView.getRecords(); - for (BaseRow record : records) { - boolean matched = inputIsLeft ? condition.apply(input, record) : condition.apply(record, input); - if (matched) { - // use -1 as the default number of associations - associations.add(new OuterRecord(record, -1)); - } - } - } - return new AssociatedRecords(associations); - } - - } - - /** - * A lazy Iterable which transform {@code List<OuterReocord>} to {@code Iterable<BaseRow>}. - */ - private static final class RecordsIterable implements IterableIterator<BaseRow> { - private final List<OuterRecord> records; - private int index = 0; - - private RecordsIterable(List<OuterRecord> records) { - this.records = records; - } - - @Override - public Iterator<BaseRow> iterator() { - index = 0; - return this; - } - - @Override - public boolean hasNext() { - return index < records.size(); - } - - @Override - public BaseRow next() { - BaseRow row = records.get(index).record; - index++; - return row; - } - } - - /** - * An {@link OuterRecord} is a composite of record and {@code numOfAssociations}. The - * {@code numOfAssociations} represents the number of associated records in the other side. - * It is used when the record is from outer side (e.g. left side in LEFT OUTER JOIN). - * When the {@code numOfAssociations} is ZERO, we need to send a null padding row. - * This is useful to avoid recompute the associated numbers every time. - * - * <p>When the record is from inner side (e.g. right side in LEFT OUTER JOIN), the - * {@code numOfAssociations} will always be {@code -1}. - */ - private static final class OuterRecord { - private final BaseRow record; - private final int numOfAssociations; - - private OuterRecord(BaseRow record, int numOfAssociations) { - this.record = record; - this.numOfAssociations = numOfAssociations; - } - } - } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingSemiAntiJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingSemiAntiJoinOperator.java new file mode 100644 index 0000000..6a8a278 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/stream/StreamingSemiAntiJoinOperator.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.join.stream; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.dataformat.util.BaseRowUtil; +import org.apache.flink.table.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.JoinRecordStateViews; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateView; +import org.apache.flink.table.runtime.join.stream.state.OuterJoinRecordStateViews; +import org.apache.flink.table.typeutils.BaseRowTypeInfo; + +/** + * Streaming unbounded Join operator which supports SEMI/ANTI JOIN. + */ +public class StreamingSemiAntiJoinOperator extends AbstractStreamingJoinOperator { + + private static final long serialVersionUID = -3135772379944924519L; + + // true if it is anti join, otherwise is semi joinp + private final boolean isAntiJoin; + + // left join state + private transient OuterJoinRecordStateView leftRecordStateView; + // right join state + private transient JoinRecordStateView rightRecordStateView; + + public StreamingSemiAntiJoinOperator( + boolean isAntiJoin, + BaseRowTypeInfo leftType, + BaseRowTypeInfo rightType, + GeneratedJoinCondition generatedJoinCondition, + JoinInputSideSpec leftInputSideSpec, + JoinInputSideSpec rightInputSideSpec, + boolean[] filterNullKeys, + long minRetentionTime) { + super(leftType, rightType, generatedJoinCondition, leftInputSideSpec, rightInputSideSpec, filterNullKeys, minRetentionTime); + this.isAntiJoin = isAntiJoin; + } + + @Override + public void open() throws Exception { + super.open(); + + this.leftRecordStateView = OuterJoinRecordStateViews.create( + getRuntimeContext(), + LEFT_RECORDS_STATE_NAME, + leftInputSideSpec, + leftType, + minRetentionTime, + stateCleaningEnabled); + + this.rightRecordStateView = JoinRecordStateViews.create( + getRuntimeContext(), + RIGHT_RECORDS_STATE_NAME, + rightInputSideSpec, + rightType, + minRetentionTime, + stateCleaningEnabled); + } + + /** + * Process an input element and output incremental joined records, retraction messages will + * be sent in some scenarios. + * + * <p>Following is the pseudo code to describe the core logic of this method. + * + * <pre> + * if there is no matched rows on the other side + * if anti join, send input record + * if there are matched rows on the other side + * if semi join, send input record + * if the input record is accumulate, state.add(record, matched size) + * if the input record is retract, state.retract(record) + * </pre> + */ + @Override + public void processElement1(StreamRecord<BaseRow> element) throws Exception { + BaseRow input = element.getValue(); + AssociatedRecords associatedRecords = AssociatedRecords.of(input, true, rightRecordStateView, joinCondition); + if (associatedRecords.isEmpty()) { + if (isAntiJoin) { + collector.collect(input); + } + } else { // there are matched rows on the other side + if (!isAntiJoin) { + collector.collect(input); + } + } + if (BaseRowUtil.isAccumulateMsg(input)) { + leftRecordStateView.addRecord(input, associatedRecords.size()); + } else { // input is retract + input.setHeader(BaseRowUtil.ACCUMULATE_MSG); + leftRecordStateView.retractRecord(input); + } + } + + /** + * Process an input element and output incremental joined records, retraction messages will + * be sent in some scenarios. + * + * <p>Following is the pseudo code to describe the core logic of this method. + * + * <pre> + * if input record is accumulate + * | state.add(record) + * | if there is no matched rows on the other side, skip + * | if there are matched rows on the other side + * | | if the matched num in the matched rows == 0 + * | | if anti join, send -[other]s + * | | if semi join, send +[other]s + * | | if the matched num in the matched rows > 0, skip + * | | otherState.update(other, old+1) + * | endif + * endif + * if input record is retract + * | state.retract(record) + * | if there is no matched rows on the other side, skip + * | if there are matched rows on the other side + * | | if the matched num in the matched rows == 0, this should never happen! + * | | if the matched num in the matched rows == 1 + * | | if semi join, send -[other] + * | | if anti join, send +[other] + * | | if the matched num in the matched rows > 1, skip + * | | otherState.update(other, old-1) + * | endif + * endif + * </pre> + */ + @Override + public void processElement2(StreamRecord<BaseRow> element) throws Exception { + BaseRow input = element.getValue(); + AssociatedRecords associatedRecords = AssociatedRecords.of(input, false, leftRecordStateView, joinCondition); + if (BaseRowUtil.isAccumulateMsg(input)) { + rightRecordStateView.addRecord(input); + if (!associatedRecords.isEmpty()) { + // there are matched rows on the other side + for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) { + BaseRow other = outerRecord.record; + if (outerRecord.numOfAssociations == 0) { + if (isAntiJoin) { + // send -[other] + other.setHeader(BaseRowUtil.RETRACT_MSG); + collector.collect(other); + // set header back to ACCUMULATE_MSG, because we will update the other row to state + other.setHeader(BaseRowUtil.ACCUMULATE_MSG); + } else { + // send +[other] + // the header of other is ACCUMULATE_MSG + collector.collect(other); + } + } // ignore when number > 0 + leftRecordStateView.updateNumOfAssociations(other, outerRecord.numOfAssociations + 1); + } + } // ignore when associated number == 0 + } else { // retract input + input.setHeader(BaseRowUtil.ACCUMULATE_MSG); + rightRecordStateView.retractRecord(input); + if (!associatedRecords.isEmpty()) { + // there are matched rows on the other side + for (OuterRecord outerRecord : associatedRecords.getOuterRecords()) { + BaseRow other = outerRecord.record; + if (outerRecord.numOfAssociations == 1) { + if (!isAntiJoin) { + // send -[other] + other.setHeader(BaseRowUtil.RETRACT_MSG); + collector.collect(other); + // set header back to ACCUMULATE_MSG, because we will update the other row to state + other.setHeader(BaseRowUtil.ACCUMULATE_MSG); + } else { + // send +[other] + collector.collect(other); + } + } // ignore when number > 0 + leftRecordStateView.updateNumOfAssociations(other, outerRecord.numOfAssociations - 1); + } + } // ignore when associated number == 0 + } + } +}