This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 091c7fa  [FLINK-15408][table-planner-blink] Interval join supports 
non-equal condition
091c7fa is described below

commit 091c7fafbc5e42ef76757b51144d7a0e273a7f29
Author: wangxlong <[email protected]>
AuthorDate: Tue Nov 3 16:32:02 2020 +0800

    [FLINK-15408][table-planner-blink] Interval join supports non-equal 
condition
---
 .../physical/stream/StreamExecIntervalJoin.scala   |  2 +-
 .../stream/StreamExecIntervalJoinRule.scala        |  4 +-
 .../plan/stream/sql/join/IntervalJoinTest.xml      | 56 ++++++++++++++++++++++
 .../plan/stream/sql/join/IntervalJoinTest.scala    | 22 +++++++++
 .../runtime/stream/sql/IntervalJoinITCase.scala    | 51 ++++++++++++++++++++
 5 files changed, 131 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
index 57183df..7c88c96 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala
@@ -69,7 +69,7 @@ class StreamExecIntervalJoin(
   with StreamPhysicalRel
   with StreamExecNode[RowData] {
 
-  if (containsPythonCall(remainCondition.get)) {
+  if (remainCondition.isDefined && containsPythonCall(remainCondition.get)) {
     throw new TableException("Only inner join condition with equality 
predicates supports the " +
       "Python UDF taking the inputs from the left table and the right table at 
the same time, " +
       "e.g., ON T1.id = T2.id && pythonUdf(T1.a, T2.b)")
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
index 881c24c..a52291d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecIntervalJoinRule.scala
@@ -47,11 +47,9 @@ class StreamExecIntervalJoinRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val join: FlinkLogicalJoin = call.rel(0)
     val joinRowType = join.getRowType
-    val joinInfo = join.analyzeCondition()
 
-    // joins require an equi-condition or a conjunctive predicate with at 
least one equi-condition
     // TODO support SEMI/ANTI join
-    if (!join.getJoinType.projectsRight || joinInfo.pairs().isEmpty) {
+    if (!join.getJoinType.projectsRight) {
       return false
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
index 1b963eb..fee20c3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
@@ -194,6 +194,34 @@ Calc(select=[a, b])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testProcessingTimeInnerJoinWithoutEqualCondition">
+       <Resource name="sql">
+         <![CDATA[
+SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + 
INTERVAL '1' HOUR
+      ]]>
+       </Resource>
+       <Resource name="planBefore">
+         <![CDATA[
+LogicalProject(a=[$0], b=[$6])
++- LogicalJoin(condition=[AND(>=($3, -($8, 3600000:INTERVAL HOUR)), <=($3, 
+($8, 3600000:INTERVAL HOUR)))], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+       </Resource>
+       <Resource name="planAfter">
+         <![CDATA[
+Calc(select=[a, b])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, 
leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1, 
rightTimeIndex=1], where=[AND(>=(proctime, -(proctime0, 3600000:INTERVAL 
HOUR)), <=(proctime, +(proctime0, 3600000:INTERVAL HOUR)))], select=[a, 
proctime, b, proctime0])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a, proctime])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[b, proctime])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+       </Resource>
+  </TestCase>
   <TestCase name="testRowTimeInnerJoinAndWindowAggregationOnFirst">
     <Resource name="sql">
       <![CDATA[
@@ -475,6 +503,34 @@ Calc(select=[a, b])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testRowTimeInnerJoinWithoutEqualCondition">
+       <Resource name="sql">
+         <![CDATA[
+SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + 
INTERVAL '1' HOUR
+      ]]>
+       </Resource>
+       <Resource name="planBefore">
+         <![CDATA[
+LogicalProject(a=[$0], b=[$6])
++- LogicalJoin(condition=[AND(>=($4, -($9, 10000:INTERVAL SECOND)), <=($4, 
+($9, 3600000:INTERVAL HOUR)))], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+]]>
+       </Resource>
+       <Resource name="planAfter">
+         <![CDATA[
+Calc(select=[a, b])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, 
rightTimeIndex=1], where=[AND(>=(rowtime, -(rowtime0, 10000:INTERVAL SECOND)), 
<=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[a, rowtime, b, 
rowtime0])
+   :- Exchange(distribution=[single])
+   :  +- Calc(select=[a, rowtime])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[b, rowtime])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a, b, c, proctime, rowtime])
+]]>
+       </Resource>
+  </TestCase>
   <TestCase name="testRowTimeLeftOuterJoin">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
index bca2c37..329ef72 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
@@ -127,6 +127,17 @@ class IntervalJoinTest extends TableTestBase {
   }
 
   @Test
+  def testProcessingTimeInnerJoinWithoutEqualCondition(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+        |    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND 
t2.proctime + INTERVAL '1' HOUR
+      """.stripMargin
+
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
   def testRowTimeInnerJoinWithOnClause(): Unit = {
     val sqlQuery =
       """
@@ -139,6 +150,17 @@ class IntervalJoinTest extends TableTestBase {
   }
 
   @Test
+  def testRowTimeInnerJoinWithoutEqualCondition(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON
+        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime 
+ INTERVAL '1' HOUR
+      """.stripMargin
+
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
   def testRowTimeInnerJoinWithWhereClause(): Unit = {
     val sqlQuery =
       """
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
index 249c63d..13ae51f 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
@@ -178,6 +178,57 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
     assertEquals(expected, sink.getAppendResults.sorted)
   }
 
+  /** test rowtime inner join without equal condition **/
+  @Test
+  def testRowTimeInnerJoinWithoutEqualCondition(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t2.key, t2.id, t1.id
+        |FROM T1 as t1 join T2 as t2 ON
+        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+        |    t2.rowtime + INTERVAL '6' SECOND
+        |""".stripMargin
+
+    val data1 = new mutable.MutableList[(String, String, Long)]
+    // for boundary test
+    data1.+=(("A", "LEFT0.999", 999L))
+    data1.+=(("A", "LEFT1", 1000L))
+    data1.+=(("A", "LEFT2", 2000L))
+    data1.+=(("A", "LEFT3", 3000L))
+    data1.+=(("B", "LEFT4", 4000L))
+    data1.+=(("A", "LEFT5", 5000L))
+    data1.+=(("A", "LEFT6", 6000L))
+    // test null key
+    data1.+=((null.asInstanceOf[String], "LEFT8", 8000L))
+
+    val data2 = new mutable.MutableList[(String, String, Long)]
+    data2.+=(("A", "RIGHT6", 6000L))
+    data2.+=(("B", "RIGHT7", 7000L))
+    // test null key
+    data2.+=((null.asInstanceOf[String], "RIGHT10", 10000L))
+
+    val t1 = env.fromCollection(data1)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rowtime.rowtime)
+    val t2 = env.fromCollection(data2)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+    val sink = new TestingAppendSink
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(sink)
+    env.execute()
+    val expected = mutable.MutableList(
+      "A,RIGHT6,LEFT1", "A,RIGHT6,LEFT2", "A,RIGHT6,LEFT3", "A,RIGHT6,LEFT4",
+      "A,RIGHT6,LEFT5", "A,RIGHT6,LEFT6", "A,RIGHT6,LEFT8", "B,RIGHT7,LEFT2",
+      "B,RIGHT7,LEFT3", "B,RIGHT7,LEFT4", "B,RIGHT7,LEFT5", "B,RIGHT7,LEFT6",
+      "B,RIGHT7,LEFT8", "null,RIGHT10,LEFT5", "null,RIGHT10,LEFT6", 
"null,RIGHT10,LEFT8"
+    )
+    assertEquals(expected, sink.getAppendResults.sorted)
+  }
+
   @Test
   def testUnboundedAggAfterRowtimeInnerJoin(): Unit = {
     val innerSql=

Reply via email to