This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94b2388  [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work 
with FLIP-95 source (#13038)
94b2388 is described below

commit 94b23885ca34927e37334fce51b930933cfd79dd
Author: Jark Wu <j...@apache.org>
AuthorDate: Wed Aug 5 12:30:38 2020 +0800

    [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 
source (#13038)
    
    This closes #13038
---
 .../kafka/table/KafkaChangelogTableITCase.java     |  63 ++--
 .../stream/MiniBatchIntervalInferRule.scala        |   5 +-
 .../factories/TestValuesRuntimeFunctions.java      |  48 +--
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 374 +++++++++++----------
 .../stream/sql/MiniBatchIntervalInferTest.scala    | 117 +++++--
 5 files changed, 352 insertions(+), 255 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 7b9b069..0ac07ae 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
@@ -28,9 +29,9 @@ import 
org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -40,12 +41,13 @@ import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.isCausedByJobFinished;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 /**
  * IT cases for Kafka with changelog format for Table API & SQL.
@@ -57,6 +59,7 @@ public class KafkaChangelogTableITCase extends 
KafkaTestBaseWithFlink {
 
        @Before
        public void setup() {
+               TestValuesTableFactory.clearAllData();
                env = StreamExecutionEnvironment.getExecutionEnvironment();
                tEnv = StreamTableEnvironment.create(
                        env,
@@ -77,6 +80,13 @@ public class KafkaChangelogTableITCase extends 
KafkaTestBaseWithFlink {
                final String topic = "changelog_topic";
                createTestTopic(topic, 1, 1);
 
+               // enables MiniBatch processing to verify MiniBatch + FLIP-95, 
see FLINK-18769
+               Configuration tableConf = tEnv.getConfig().getConfiguration();
+               tableConf.setString("table.exec.mini-batch.enabled", "true");
+               tableConf.setString("table.exec.mini-batch.allow-latency", 
"1s");
+               tableConf.setString("table.exec.mini-batch.size", "5000");
+               tableConf.setString("table.optimizer.agg-phase-strategy", 
"TWO_PHASE");
+
                // ---------- Write the Debezium json into Kafka 
-------------------
                List<String> lines = 
readLines("debezium-data-schema-exclude.txt");
                DataStreamSource<String> stream = env.fromCollection(lines);
@@ -116,24 +126,12 @@ public class KafkaChangelogTableITCase extends 
KafkaTestBaseWithFlink {
                        " PRIMARY KEY (name) NOT ENFORCED" +
                        ") WITH (" +
                        " 'connector' = 'values'," +
-                       " 'sink-insert-only' = 'false'," +
-                       " 'sink-expected-messages-num' = '20'" +
+                       " 'sink-insert-only' = 'false'" +
                        ")";
                tEnv.executeSql(sourceDDL);
                tEnv.executeSql(sinkDDL);
-
-               try {
-                       TableEnvUtil.execInsertSqlAndWaitResult(
-                               tEnv,
-                               "INSERT INTO sink SELECT name, SUM(weight) FROM 
debezium_source GROUP BY name");
-               } catch (Throwable t) {
-                       // we have to use a specific exception to indicate the 
job is finished,
-                       // because the registered Kafka source is infinite.
-                       if (!isCausedByJobFinished(t)) {
-                               // re-throw
-                               throw t;
-                       }
-               }
+               TableResult tableResult = tEnv.executeSql(
+                       "INSERT INTO sink SELECT name, SUM(weight) FROM 
debezium_source GROUP BY name");
 
                // Debezium captures change data on the `products` table:
                //
@@ -179,15 +177,15 @@ public class KafkaChangelogTableITCase extends 
KafkaTestBaseWithFlink {
                // | 110 | jacket             | new water resistent white wind 
breaker                  |    0.5 |
                // 
+-----+--------------------+---------------------------------------------------------+--------+
 
-               String[] expected = new String[]{
+               List<String> expected = Arrays.asList(
                        "scooter,3.140", "car battery,8.100", "12-pack drill 
bits,0.800",
-                       "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare 
tire,22.200"};
+                       "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare 
tire,22.200");
 
-               List<String> actual = TestValuesTableFactory.getResults("sink");
-               assertThat(actual, containsInAnyOrder(expected));
+               waitingExpectedResults("sink", expected, 
Duration.ofSeconds(10));
 
                // ------------- cleanup -------------------
 
+               tableResult.getJobClient().get().cancel().get(); // stop the job
                deleteTestTopic(topic);
        }
 
@@ -201,4 +199,23 @@ public class KafkaChangelogTableITCase extends 
KafkaTestBaseWithFlink {
                Path path = new File(url.getFile()).toPath();
                return Files.readAllLines(path);
        }
+
+       private static void waitingExpectedResults(String sinkName, 
List<String> expected, Duration timeout) throws InterruptedException {
+               long now = System.currentTimeMillis();
+               long stop = now + timeout.toMillis();
+               Collections.sort(expected);
+               while (System.currentTimeMillis() < stop) {
+                       List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
+                       Collections.sort(actual);
+                       if (expected.equals(actual)) {
+                               return;
+                       }
+                       Thread.sleep(100);
+               }
+
+               // timeout, assert again
+               List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
+               Collections.sort(actual);
+               assertEquals(expected, actual);
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
index 3971744..714b249 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, 
MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode}
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan,
 StreamExecGroupWindowAggregate, StreamExecMiniBatchAssigner, 
StreamExecLegacyTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel}
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan,
 StreamExecGroupWindowAggregate, StreamExecLegacyTableSourceScan, 
StreamExecMiniBatchAssigner, StreamExecTableSourceScan, 
StreamExecWatermarkAssigner, StreamPhysicalRel}
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.hep.HepRelVertex
@@ -122,7 +122,8 @@ class MiniBatchIntervalInferRule extends RelOptRule(
       .getMiniBatchInterval
       .mode
     node match {
-      case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan =>
+      case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan |
+           _: StreamExecTableSourceScan =>
         // append minibatch node if the mode is not NONE and reach a source 
leaf node
         mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime
       case _: StreamExecWatermarkAssigner  =>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index b2cc256..e4ede00 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -65,40 +65,42 @@ final class TestValuesRuntimeFunctions {
 
        static List<String> getRawResults(String tableName) {
                List<String> result = new ArrayList<>();
-               if (globalRawResult.containsKey(tableName)) {
-                       globalRawResult.get(tableName)
-                               .values()
-                               .forEach(result::addAll);
-               } else {
-                       throw new IllegalArgumentException("Can't find result 
for the table '" + tableName + "'.");
+               synchronized (TestValuesTableFactory.class) {
+                       if (globalRawResult.containsKey(tableName)) {
+                               globalRawResult.get(tableName)
+                                       .values()
+                                       .forEach(result::addAll);
+                       }
                }
                return result;
        }
 
        static List<String> getResults(String tableName) {
                List<String> result = new ArrayList<>();
-               if (globalUpsertResult.containsKey(tableName)) {
-                       globalUpsertResult.get(tableName)
-                               .values()
-                               .forEach(map -> result.addAll(map.values()));
-               } else if (globalRetractResult.containsKey(tableName)) {
-                       globalRetractResult.get(tableName)
-                               .values()
-                               .forEach(result::addAll);
-               } else if (globalRawResult.containsKey(tableName)) {
-                       getRawResults(tableName).stream()
-                               .map(s -> s.substring(3, s.length() - 1)) // 
removes the +I(...) wrapper
-                               .forEach(result::add);
-               } else {
-                       throw new IllegalArgumentException("Can't find result 
for the table '" + tableName + "'.");
+               synchronized (TestValuesTableFactory.class) {
+                       if (globalUpsertResult.containsKey(tableName)) {
+                               globalUpsertResult.get(tableName)
+                                       .values()
+                                       .forEach(map -> 
result.addAll(map.values()));
+                       } else if (globalRetractResult.containsKey(tableName)) {
+                               globalRetractResult.get(tableName)
+                                       .values()
+                                       .forEach(result::addAll);
+                       } else if (globalRawResult.containsKey(tableName)) {
+                               getRawResults(tableName).stream()
+                                       .map(s -> s.substring(3, s.length() - 
1)) // removes the +I(...) wrapper
+                                       .forEach(result::add);
+                       }
                }
                return result;
        }
 
        static void clearResults() {
-               globalRawResult.clear();
-               globalUpsertResult.clear();
-               globalRetractResult.clear();
+               synchronized (TestValuesTableFactory.class) {
+                       globalRawResult.clear();
+                       globalUpsertResult.clear();
+                       globalRetractResult.clear();
+               }
        }
 
        // 
------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 124fad3..8456d01 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -16,6 +16,56 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testIntervalJoinWithMiniBatch">
+    <Resource name="sql">
+      <![CDATA[
+ SELECT b, COUNT(a)
+ FROM (
+   SELECT t1.a as a, t1.b as b
+   FROM
+     wmTable1 as t1 JOIN wmTable2 as t2
+   ON
+     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
+     t2.rowtime + INTERVAL '10' SECOND
+ )
+ GROUP BY b
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
++- LogicalProject(b=[$1], a=[$0])
+   +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL 
SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
+      :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+      :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable1]])
+      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable2]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
+      +- Calc(select=[b, a])
+         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], 
where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), 
<=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, 
rowtime0])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, b, rowtime])
+            :     +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+            :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+            :           +- Calc(select=[a, b, c, PROCTIME() AS proctime, 
rowtime])
+            :              +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, rowtime])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+                        +- Calc(select=[a, b, c, PROCTIME() AS proctime, 
rowtime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testMiniBatchOnDataStreamWithRowTime">
     <Resource name="sql">
       <![CDATA[
@@ -60,9 +110,30 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[MAX($0)], EXP
 GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS 
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable1, project=[b, a, c]]], fields=[b, a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMiniBatchOnlyForDataStream">
+    <Resource name="sql">
+      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 
GROUP BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], 
EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
MyDataStream1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS 
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
       +- Calc(select=[b, a, c])
          +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-            +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable1]], fields=[a, b, c, proctime, rowtime])
+            +- DataStreamScan(table=[[default_catalog, default_database, 
MyDataStream1]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -76,7 +147,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, 
COUNT(distinct$0 count$0) AS EXPR$1
  FROM (
    SELECT t1.a as a, t1.b as b, t1.rowtime as rt
    FROM
-     LeftT as t1 JOIN RightT as t2
+     wmTable1 as t1 JOIN wmTable2 as t2
    ON
      t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
      t2.rowtime + INTERVAL '10' SECOND
@@ -90,10 +161,12 @@ LogicalProject(b=[$0], EXPR$1=[$2], 
EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN
 +- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)])
    +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
       +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL 
SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
-         :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+         :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+         :  +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+         :     +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable1]])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable2]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -105,12 +178,14 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS 
EXPR$3])
          +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], 
where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), 
<=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, 
rowtime0])
             :- Exchange(distribution=[hash[a]])
             :  +- Calc(select=[a, b, rowtime])
-            :     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :        +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            :     +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+            :        +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+            :           +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
             +- Exchange(distribution=[hash[a]])
                +- Calc(select=[a, rowtime])
-                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                     +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                     +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable2]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -126,7 +201,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
     SELECT b,
      COUNT(a) as a,
      TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
-    FROM LeftT
+    FROM wmTable1
     GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
   ) as t1
   JOIN
@@ -134,7 +209,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3])
     SELECT b,
      COUNT(a) as a,
      HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
-    FROM RightT
+    FROM wmTable2
     GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
   ) as t2
   ON
@@ -150,13 +225,15 @@ LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION 
BY $0 ORDER BY $2 NULLS
    :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)])
    :  +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
    :     +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], 
a=[$0])
-   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-   :           +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], 
proctime=[PROCTIME()], rowtime=[$3])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, wmTable1]])
    +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)])
       +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
          +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], a=[$0])
-            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], 
proctime=[PROCTIME()], rowtime=[$3])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, wmTable2]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -171,15 +248,17 @@ Calc(select=[b, w0$o0 AS $1])
             :     +- GroupWindowAggregate(groupBy=[b], 
window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
             :        +- Exchange(distribution=[hash[b]])
             :           +- Calc(select=[b, rowtime, a])
-            :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :                 +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+            :                 +- Calc(select=[a, b, c, PROCTIME() AS proctime, 
rowtime])
+            :                    +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
             +- Exchange(distribution=[hash[a]])
                +- Calc(select=[b, a, w$rowtime AS rt])
                   +- GroupWindowAggregate(groupBy=[b], 
window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], properties=[w$start, 
w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
                      +- Exchange(distribution=[hash[b]])
                         +- Calc(select=[b, rowtime, a])
-                           +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                              +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                           +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+                              +- Calc(select=[a, b, c, PROCTIME() AS proctime, 
rowtime])
+                                 +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable2]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -195,7 +274,7 @@ Calc(select=[b, w0$o0 AS $1])
     FROM (
       SELECT b, COUNT(a) as a,
          HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt
-      FROM RightT
+      FROM wmTable1
       GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
     )
     GROUP BY b
@@ -215,8 +294,9 @@ LogicalProject(a=[$0], b=[$1])
       +- LogicalProject(b=[$0], a=[$2])
          +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
             +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], a=[$0])
-               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], 
proctime=[PROCTIME()], rowtime=[$3])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -228,9 +308,8 @@ Calc(select=[a, b])
    :     +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
    :        +- Exchange(distribution=[hash[a]])
    :           +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS 
count$0])
-   :              +- Calc(select=[a, b])
-   :                 +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime])
-   :                    +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+   :              +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime])
+   :                 +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable1, project=[a, b]]], fields=[a, b])
    +- Exchange(distribution=[hash[a]])
       +- GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count1$0) AS a])
          +- Exchange(distribution=[hash[b]])
@@ -238,8 +317,9 @@ Calc(select=[a, b])
                +- GroupWindowAggregate(groupBy=[b], 
window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS 
a])
                   +- Exchange(distribution=[hash[b]])
                      +- Calc(select=[b, rowtime, a])
-                        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                           +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+                           +- Calc(select=[a, b, c, PROCTIME() AS proctime, 
rowtime])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -254,9 +334,9 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fie
             +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                   +- LogicalJoin(condition=[true], joinType=[inner])
-                     :- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                     :- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[$1])
                      :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-                     +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[$1])
                         +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], 
fields=[a, b])
@@ -265,9 +345,9 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fie
       +- LogicalProject($f0=[$TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'-'])
          +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
             +- LogicalJoin(condition=[true], joinType=[inner])
-               :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
0:INTERVAL MILLISECOND)])
+               :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
0:INTERVAL MILLISECOND)])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1])
                   +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], 
fields=[a, b])
@@ -277,19 +357,19 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fie
          +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'#'])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
-                  :- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                  :- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[$1])
                   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($1, 0:INTERVAL MILLISECOND)])
+                  +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[$1])
                      +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 == Optimized Logical Plan ==
 IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, 
rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL 
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods], 
reuse_id=[1])
 :- Exchange(distribution=[hash[id1]])
-:  +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL 
MILLISECOND)])
-:     +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[id1, rowtime, text])
+:  +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+:     +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[id1, rowtime, text])
 +- Exchange(distribution=[hash[id2]])
-   +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL 
MILLISECOND)])
-      +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, rowtime, cnt, name, goods])
+   +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+      +- TableSourceScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, rowtime, cnt, name, goods])
 
 Exchange(distribution=[hash[id1]], reuse_id=[2])
 +- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
@@ -317,45 +397,49 @@ 
LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a,
 
 == Physical Execution Plan ==
  : Data Source
-       content : Source: Collection Source
+       content : Source: TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[id1, rowtime, text])
+
+        : Operator
+               content : WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+               ship_strategy : FORWARD
 
  : Data Source
-       content : Source: Collection Source
+       content : Source: TableSourceScan(table=[[default_catalog, 
default_database, T2]], fields=[id2, rowtime, cnt, name, goods])
 
         : Operator
-               content : 
SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, 
rowtime, text])
+               content : WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
                ship_strategy : FORWARD
 
                 : Operator
-                       content : WatermarkAssigner(rowtime=[rowtime], 
watermark=[(rowtime - 0:INTERVAL MILLISECOND)])
-                       ship_strategy : FORWARD
+                       content : IntervalJoin(joinType=[InnerJoin], 
windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, 
leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > 
(CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) 
+ 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, 
name, goods])
+                       ship_strategy : HASH
 
                         : Operator
-                               content : 
SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, 
rowtime, cnt, name, goods])
+                               content : Calc(select=[rowtime, id1, text, 
_UTF-16LE'#' AS $f3])
                                ship_strategy : FORWARD
 
                                 : Operator
-                                       content : 
WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL 
MILLISECOND)])
-                                       ship_strategy : FORWARD
+                                       content : 
GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 
6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, 
LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+                                       ship_strategy : HASH
 
                                         : Operator
-                                               content : 
IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, 
rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 
300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL 
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
-                                               ship_strategy : HASH
+                                               content : 
Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+                                               ship_strategy : FORWARD
 
                                                 : Operator
-                                                       content : 
Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
-                                                       ship_strategy : FORWARD
+                                                       content : 
GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 
12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+                                                       ship_strategy : HASH
 
                                                         : Operator
-                                                               content : 
GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 
6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, 
LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-                                                               ship_strategy : 
HASH
+                                                               content : 
SinkConversionToRow
+                                                               ship_strategy : 
FORWARD
 
                                                                 : Operator
-                                                                       content 
: Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+                                                                       content 
: Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3])
                                                                        
ship_strategy : FORWARD
 
                                                                         : 
Operator
-                                                                               
content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, 
$f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+                                                                               
content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, 
rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
                                                                                
ship_strategy : HASH
 
                                                                                
 : Operator
@@ -363,49 +447,37 @@ 
LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a,
                                                                                
        ship_strategy : FORWARD
 
                                                                                
         : Operator
-                                                                               
                content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3])
-                                                                               
                ship_strategy : FORWARD
+                                                                               
                content : GroupWindowAggregate(groupBy=[id1], 
window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, 
$f3) AS text])
+                                                                               
                ship_strategy : HASH
 
                                                                                
                 : Operator
-                                                                               
                        content : GroupWindowAggregate(groupBy=[id1], 
window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, 
$f3) AS EXPR$1])
-                                                                               
                        ship_strategy : HASH
+                                                                               
                        content : LocalGroupAggregate(groupBy=[id1], 
select=[id1, COUNT(text) AS count$0])
+                                                                               
                        ship_strategy : FORWARD
 
                                                                                
                         : Operator
-                                                                               
                                content : SinkConversionToRow
-                                                                               
                                ship_strategy : FORWARD
+                                                                               
                                content : GlobalGroupAggregate(groupBy=[id1], 
select=[id1, COUNT(count$0) AS EXPR$1])
+                                                                               
                                ship_strategy : HASH
 
                                                                                
                                 : Operator
-                                                                               
                                        content : 
GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 
6000)], select=[id1, LISTAGG(text, $f3) AS text])
-                                                                               
                                        ship_strategy : HASH
+                                                                               
                                        content : SinkConversionToTuple2
+                                                                               
                                        ship_strategy : FORWARD
 
                                                                                
                                         : Operator
-                                                                               
                                                content : 
LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
+                                                                               
                                                content : Map
                                                                                
                                                ship_strategy : FORWARD
 
-                                                                               
                                                 : Operator
-                                                                               
                                                        content : 
GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
-                                                                               
                                                        ship_strategy : HASH
+                                                                               
                                                 : Data Sink
+                                                                               
                                                        content : Sink: 
TestingAppendTableSink
+                                                                               
                                                        ship_strategy : FORWARD
 
-                                                                               
                                                         : Operator
-                                                                               
                                                                content : 
SinkConversionToTuple2
+                                                                               
                                                         : Data Sink
+                                                                               
                                                                content : Sink: 
TestingAppendTableSink
                                                                                
                                                                ship_strategy : 
FORWARD
 
-                                                                               
                                                                 : Operator
-                                                                               
                                                                        content 
: Map
+                                                                               
                                                                 : Data Sink
+                                                                               
                                                                        content 
: Sink: TestingRetractTableSink
                                                                                
                                                                        
ship_strategy : FORWARD
 
-                                                                               
                                                                         : Data 
Sink
-                                                                               
                                                                                
content : Sink: TestingAppendTableSink
-                                                                               
                                                                                
ship_strategy : FORWARD
-
-                                                                               
                                                                                
 : Data Sink
-                                                                               
                                                                                
        content : Sink: TestingAppendTableSink
-                                                                               
                                                                                
        ship_strategy : FORWARD
-
-                                                                               
                                                                                
         : Data Sink
-                                                                               
                                                                                
                content : Sink: TestingRetractTableSink
-                                                                               
                                                                                
                ship_strategy : FORWARD
-
 ]]>
     </Resource>
   </TestCase>
@@ -442,30 +514,6 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, 
COUNT(count$0) AS EXPR$1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRedundantWatermarkDefinition">
-    <Resource name="sql">
-      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP 
BY b]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], 
EXPR$3=[SUM($2)])
-+- LogicalProject(b=[$1], a=[$0], c=[$2])
-   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL 
MILLISECOND)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
-+- Exchange(distribution=[hash[b]])
-   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS 
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
-      +- Calc(select=[b, a, c])
-         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
0:INTERVAL MILLISECOND)])
-               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable1]], fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testRowtimeRowsOverWithMiniBatch">
     <Resource name="sql">
       <![CDATA[
@@ -473,7 +521,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, 
COUNT(distinct$0 count$0) AS EXPR$1
  FROM (
    SELECT c, COUNT(a)
    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT 
ROW) as cnt
-   FROM MyTable3
+   FROM wmTable1
  )
  GROUP BY cnt
       ]]>
@@ -482,8 +530,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, 
COUNT(distinct$0 count$0) AS EXPR$1
       <![CDATA[
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
 +- LogicalProject(cnt=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST 
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)], c=[$2])
-   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL 
MILLISECOND)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -496,8 +545,35 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, 
COUNT(count$0) AS EXPR$1])
             +- Exchange(distribution=[hash[c]])
                +- Calc(select=[a, c, rowtime])
                   +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
-                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                        +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[rowtime])
+                        +- Calc(select=[a, b, c, PROCTIME() AS proctime, 
rowtime])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRedundantWatermarkDefinition">
+    <Resource name="sql">
+      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP 
BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], 
EXPR$3=[SUM($2)])
++- LogicalProject(b=[$1], a=[$0], c=[$2])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable1]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS 
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- Calc(select=[b, a, c])
+         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+               +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -521,7 +597,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
    +- LogicalFilter(condition=[=($1, $6)])
       +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{4}])
          :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+         :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyDataStream1]])
          +- LogicalTableFunctionScan(invocation=[Rates($cor0.rowtime)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(PROCTIME) proctime, TIME ATTRIBUTE(ROWTIME) rowtime)], 
elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
@@ -536,58 +612,12 @@ GlobalGroupAggregate(groupBy=[r_a], select=[r_a, 
COUNT(count$0) AS EXPR$1])
             :  +- Calc(select=[a, b, rowtime])
             :     +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
             :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :           +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
+            :           +- DataStreamScan(table=[[default_catalog, 
default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime])
             +- Exchange(distribution=[hash[b]])
                +- Calc(select=[a, b, rowtime])
                   +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
                      +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                        +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testIntervalJoinWithMiniBatch">
-    <Resource name="sql">
-      <![CDATA[
- SELECT b, COUNT(a)
- FROM (
-   SELECT t1.a as a, t1.b as b
-   FROM
-     LeftT as t1 JOIN RightT as t2
-   ON
-     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND
-     t2.rowtime + INTERVAL '10' SECOND
- )
- GROUP BY b
-      ]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
-+- LogicalProject(b=[$1], a=[$0])
-   +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL 
SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
-      :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
-      +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1])
-+- Exchange(distribution=[hash[b]])
-   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0])
-      +- Calc(select=[b, a])
-         +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], 
where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), 
<=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, 
rowtime0])
-            :- Exchange(distribution=[hash[a]])
-            :  +- Calc(select=[a, b, rowtime])
-            :     +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
-            :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-            :           +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable1]], fields=[a, b, c, proctime, rowtime])
-            +- Exchange(distribution=[hash[a]])
-               +- Calc(select=[a, rowtime])
-                  +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime])
-                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 0:INTERVAL MILLISECOND)])
-                        +- DataStreamScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                        +- DataStreamScan(table=[[default_catalog, 
default_database, MyDataStream2]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -600,7 +630,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) 
AS EXPR$1])
    SELECT b,
      COUNT(a) as cnt,
      TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
-   FROM MyTable3
+   FROM wmTable1
    GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
  )
  GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
@@ -613,8 +643,9 @@ LogicalProject(b=[$0], EXPR$1=[$2])
    +- LogicalProject(b=[$0], $f1=[$TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL 
SECOND)], cnt=[$2])
       +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
          +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 10000:INTERVAL SECOND)], 
a=[$0])
-            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-               +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], 
proctime=[PROCTIME()], rowtime=[$3])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -625,8 +656,9 @@ GroupWindowAggregate(groupBy=[b], 
window=[TumblingGroupWindow('w$, $f1, 5000)],
       +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, 
rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], 
select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
          +- Exchange(distribution=[hash[b]])
             +- Calc(select=[b, rowtime, a])
-               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
0:INTERVAL MILLISECOND)])
-                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable1]], fields=[a, b, c, proctime, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                  +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -639,7 +671,7 @@ GroupWindowAggregate(groupBy=[b], 
window=[TumblingGroupWindow('w$, $f1, 5000)],
      COUNT(a) as cnt,
      HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
      HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
-   FROM MyTable3
+   FROM wmTable1
    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
  )
  GROUP BY b
@@ -651,8 +683,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
 +- LogicalProject(b=[$0], cnt=[$2])
    +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
       +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 
6000:INTERVAL SECOND)], a=[$0])
-         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
+         +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], 
rowtime=[$3])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
wmTable1]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -663,8 +696,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, 
$SUM0_RETRACT(sum$0) AS EXPR$1])
       +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, 
rowtime, 6000, 5000)], select=[b, COUNT(a) AS cnt], emit=[early delay 500 
millisecond])
          +- Exchange(distribution=[hash[b]])
             +- Calc(select=[b, rowtime, a])
-               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
0:INTERVAL MILLISECOND)])
-                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable1]], fields=[a, b, c, proctime, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
+                  +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, wmTable1]], fields=[a, b, c, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
index 26fabc1..fe3d08a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala
@@ -39,9 +39,37 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Before
   def setup(): Unit = {
     util.addDataStream[(Int, String, Long)](
-      "MyTable1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+      "MyDataStream1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
     util.addDataStream[(Int, String, Long)](
-      "MyTable2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+      "MyDataStream2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+    // register tables using DDL
+    util.addTable(
+      s"""
+         |CREATE TABLE MyTable1 (
+         |  `a` INT,
+         |  `b` STRING,
+         |  `c` BIGINT,
+         |  proctime AS PROCTIME(),
+         |  rowtime TIMESTAMP(3)
+         |) WITH (
+         |  'connector' = 'values'
+         |)
+         |""".stripMargin)
+    util.addTable(
+      s"""
+         |CREATE TABLE wmTable1 (
+         |  WATERMARK FOR rowtime AS rowtime
+         |) LIKE MyTable1 (INCLUDING ALL)
+         |""".stripMargin)
+    util.addTable(
+      s"""
+         |CREATE TABLE wmTable2 (
+         |  WATERMARK FOR rowtime AS rowtime
+         |) LIKE MyTable1 (INCLUDING ALL)
+         |""".stripMargin)
+
+    // enable mini-batch
     util.tableEnv.getConfig.getConfiguration.setBoolean(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
   }
@@ -49,17 +77,24 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMiniBatchOnly(): Unit = {
     util.tableEnv.getConfig.getConfiguration
-        .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
"1 s")
+      .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 
s")
     val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP 
BY b"
     util.verifyPlan(sql)
   }
 
   @Test
+  def testMiniBatchOnlyForDataStream(): Unit = {
+    util.tableEnv.getConfig.getConfiguration
+        .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
"1 s")
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 
GROUP BY b"
+    util.verifyPlan(sql)
+  }
+
+  @Test
   def testRedundantWatermarkDefinition(): Unit = {
     util.tableEnv.getConfig.getConfiguration
         .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
"1 s")
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
-    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP 
BY b"
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP 
BY b"
     util.verifyPlan(sql)
   }
 
@@ -69,7 +104,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
     tableConfig.getConfiguration
         .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
"1 s")
     withEarlyFireDelay(tableConfig, Time.milliseconds(500))
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
     val sql =
       """
         | SELECT b, SUM(cnt)
@@ -78,7 +112,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |     COUNT(a) as cnt,
         |     HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as 
w_start,
         |     HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as 
w_end
-        |   FROM MyTable3
+        |   FROM wmTable1
         |   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
         | )
         | GROUP BY b
@@ -90,7 +124,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   def testWindowCascade(): Unit = {
     util.tableEnv.getConfig.getConfiguration
         .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
"3 s")
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
     val sql =
       """
         | SELECT b,
@@ -99,7 +132,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |   SELECT b,
         |     COUNT(a) as cnt,
         |     TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt
-        |   FROM MyTable3
+        |   FROM wmTable1
         |   GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND)
         | )
         | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND)
@@ -109,8 +142,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testIntervalJoinWithMiniBatch(): Unit = {
-    util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), 
"rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
 
@@ -120,7 +151,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         | FROM (
         |   SELECT t1.a as a, t1.b as b
         |   FROM
-        |     LeftT as t1 JOIN RightT as t2
+        |     wmTable1 as t1 JOIN wmTable2 as t2
         |   ON
         |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' 
SECOND AND
         |     t2.rowtime + INTERVAL '10' SECOND
@@ -132,7 +163,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testRowtimeRowsOverWithMiniBatch(): Unit = {
-    util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
 
@@ -142,7 +172,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         | FROM (
         |   SELECT c, COUNT(a)
         |   OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND 
CURRENT ROW) as cnt
-        |   FROM MyTable3
+        |   FROM wmTable1
         | )
         | GROUP BY cnt
       """.stripMargin
@@ -152,8 +182,8 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testTemporalTableFunctionJoinWithMiniBatch(): Unit = {
-    util.addTableWithWatermark("Orders", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
-    util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyTable2"), 
"rowtime", 0)
+    util.addTableWithWatermark("Orders", util.tableEnv.from("MyDataStream1"), 
"rowtime", 0)
+    util.addTableWithWatermark("RatesHistory", 
util.tableEnv.from("MyDataStream2"), "rowtime", 0)
 
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
@@ -180,8 +210,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMultiOperatorNeedsWatermark1(): Unit = {
     // infer result: miniBatchInterval=[Rowtime, 0ms]
-    util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), 
"rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s")
 
@@ -194,7 +222,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         | FROM (
         |   SELECT t1.a as a, t1.b as b, t1.rowtime as rt
         |   FROM
-        |     LeftT as t1 JOIN RightT as t2
+        |     wmTable1 as t1 JOIN wmTable2 as t2
         |   ON
         |     t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' 
SECOND AND
         |     t2.rowtime + INTERVAL '10' SECOND
@@ -206,8 +234,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMultiOperatorNeedsWatermark2(): Unit = {
-    util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), 
"rowtime", 0)
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), 
"rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s")
 
@@ -222,7 +248,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |    SELECT b,
         |     COUNT(a) as a,
         |     TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt
-        |    FROM LeftT
+        |    FROM wmTable1
         |    GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)
         |  ) as t1
         |  JOIN
@@ -230,7 +256,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |    SELECT b,
         |     COUNT(a) as a,
         |     HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) 
as rt
-        |    FROM RightT
+        |    FROM wmTable2
         |    GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
         |  ) as t2
         |  ON
@@ -243,7 +269,6 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
   @Test
   def testMultiOperatorNeedsWatermark3(): Unit = {
-    util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), 
"rowtime", 0)
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s")
 
@@ -258,7 +283,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |    FROM (
         |      SELECT b, COUNT(a) as a,
         |         HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' 
SECOND) as rt
-        |      FROM RightT
+        |      FROM wmTable1
         |      GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' 
SECOND)
         |    )
         |    GROUP BY b
@@ -274,12 +299,30 @@ class MiniBatchIntervalInferTest extends TableTestBase {
   @Test
   def testMultipleWindowAggregates(): Unit = {
     val stmtSet = util.tableEnv.createStatementSet()
-    util.addDataStream[(Int, Long, String)]("T1", 'id1, 'rowtime.rowtime, 
'text)
-    util.addDataStream[(Int, Long, Int, String, String)](
-      "T2",
-      'id2, 'rowtime.rowtime, 'cnt, 'name, 'goods)
-    util.addTableWithWatermark("T3", util.tableEnv.from("T1"), "rowtime", 0)
-    util.addTableWithWatermark("T4", util.tableEnv.from("T2"), "rowtime", 0)
+    util.addTable(
+      s"""
+         |CREATE TABLE T1 (
+         | id1 INT,
+         | rowtime TIMESTAMP(3),
+         | `text` STRING,
+         | WATERMARK FOR rowtime AS rowtime
+         |) WITH (
+         | 'connector' = 'values'
+         |)
+         |""".stripMargin)
+    util.addTable(
+      s"""
+         |CREATE TABLE T2 (
+         | id2 INT,
+         | rowtime TIMESTAMP(3),
+         | cnt INT,
+         | name STRING,
+         | goods STRING,
+         | WATERMARK FOR rowtime AS rowtime
+         |) WITH (
+         | 'connector' = 'values'
+         |)
+         |""".stripMargin)
 
     util.tableEnv.getConfig.getConfiguration.setString(
       ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "500 ms")
@@ -288,13 +331,13 @@ class MiniBatchIntervalInferTest extends TableTestBase {
 
     val table1 = util.tableEnv.sqlQuery(
       """
-        |SELECT id1, T3.rowtime AS ts, text
-        |  FROM T3, T4
+        |SELECT id1, T1.rowtime AS ts, text
+        |  FROM T1, T2
         |WHERE id1 = id2
-        |      AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE
-        |      AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE
+        |      AND T1.rowtime > T2.rowtime - INTERVAL '5' MINUTE
+        |      AND T1.rowtime < T2.rowtime + INTERVAL '3' MINUTE
       """.stripMargin)
-    util.tableEnv.registerTable("TempTable1", table1)
+    util.tableEnv.createTemporaryView("TempTable1", table1)
 
     val table2 = util.tableEnv.sqlQuery(
       """
@@ -304,7 +347,7 @@ class MiniBatchIntervalInferTest extends TableTestBase {
         |FROM TempTable1
         |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1
       """.stripMargin)
-    util.tableEnv.registerTable("TempTable2", table2)
+    util.tableEnv.createTemporaryView("TempTable2", table2)
 
   val table3 = util.tableEnv.sqlQuery(
       """

Reply via email to