This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 94b2388 [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038) 94b2388 is described below commit 94b23885ca34927e37334fce51b930933cfd79dd Author: Jark Wu <j...@apache.org> AuthorDate: Wed Aug 5 12:30:38 2020 +0800 [FLINK-18769][table-planner-blink] Fix MiniBatch doesn't work with FLIP-95 source (#13038) This closes #13038 --- .../kafka/table/KafkaChangelogTableITCase.java | 63 ++-- .../stream/MiniBatchIntervalInferRule.scala | 5 +- .../factories/TestValuesRuntimeFunctions.java | 48 +-- .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 374 +++++++++++---------- .../stream/sql/MiniBatchIntervalInferTest.scala | 117 +++++-- 5 files changed, 352 insertions(+), 255 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java index 7b9b069..0ac07ae 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; @@ -28,9 +29,9 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.TableEnvUtil; import org.junit.Before; import org.junit.Test; @@ -40,12 +41,13 @@ import java.io.IOException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestBase.isCausedByJobFinished; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; /** * IT cases for Kafka with changelog format for Table API & SQL. @@ -57,6 +59,7 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink { @Before public void setup() { + TestValuesTableFactory.clearAllData(); env = StreamExecutionEnvironment.getExecutionEnvironment(); tEnv = StreamTableEnvironment.create( env, @@ -77,6 +80,13 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink { final String topic = "changelog_topic"; createTestTopic(topic, 1, 1); + // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769 + Configuration tableConf = tEnv.getConfig().getConfiguration(); + tableConf.setString("table.exec.mini-batch.enabled", "true"); + tableConf.setString("table.exec.mini-batch.allow-latency", "1s"); + tableConf.setString("table.exec.mini-batch.size", "5000"); + tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); + // ---------- Write the Debezium json into Kafka ------------------- List<String> lines = readLines("debezium-data-schema-exclude.txt"); DataStreamSource<String> stream = env.fromCollection(lines); @@ -116,24 +126,12 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink { " PRIMARY KEY (name) NOT ENFORCED" + ") WITH (" + " 'connector' = 'values'," + - " 'sink-insert-only' = 'false'," + - " 'sink-expected-messages-num' = '20'" + + " 'sink-insert-only' = 'false'" + ")"; tEnv.executeSql(sourceDDL); tEnv.executeSql(sinkDDL); - - try { - TableEnvUtil.execInsertSqlAndWaitResult( - tEnv, - "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name"); - } catch (Throwable t) { - // we have to use a specific exception to indicate the job is finished, - // because the registered Kafka source is infinite. - if (!isCausedByJobFinished(t)) { - // re-throw - throw t; - } - } + TableResult tableResult = tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name"); // Debezium captures change data on the `products` table: // @@ -179,15 +177,15 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink { // | 110 | jacket | new water resistent white wind breaker | 0.5 | // +-----+--------------------+---------------------------------------------------------+--------+ - String[] expected = new String[]{ + List<String> expected = Arrays.asList( "scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", - "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"}; + "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"); - List<String> actual = TestValuesTableFactory.getResults("sink"); - assertThat(actual, containsInAnyOrder(expected)); + waitingExpectedResults("sink", expected, Duration.ofSeconds(10)); // ------------- cleanup ------------------- + tableResult.getJobClient().get().cancel().get(); // stop the job deleteTestTopic(topic); } @@ -201,4 +199,23 @@ public class KafkaChangelogTableITCase extends KafkaTestBaseWithFlink { Path path = new File(url.getFile()).toPath(); return Files.readAllLines(path); } + + private static void waitingExpectedResults(String sinkName, List<String> expected, Duration timeout) throws InterruptedException { + long now = System.currentTimeMillis(); + long stop = now + timeout.toMillis(); + Collections.sort(expected); + while (System.currentTimeMillis() < stop) { + List<String> actual = TestValuesTableFactory.getResults(sinkName); + Collections.sort(actual); + if (expected.equals(actual)) { + return; + } + Thread.sleep(100); + } + + // timeout, assert again + List<String> actual = TestValuesTableFactory.getResults(sinkName); + Collections.sort(actual); + assertEquals(expected, actual); + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala index 3971744..714b249 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchIntervalTrait, MiniBatchIntervalTraitDef, MiniBatchMode} -import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecMiniBatchAssigner, StreamExecLegacyTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel} +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecDataStreamScan, StreamExecGroupWindowAggregate, StreamExecLegacyTableSourceScan, StreamExecMiniBatchAssigner, StreamExecTableSourceScan, StreamExecWatermarkAssigner, StreamPhysicalRel} import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.hep.HepRelVertex @@ -122,7 +122,8 @@ class MiniBatchIntervalInferRule extends RelOptRule( .getMiniBatchInterval .mode node match { - case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan => + case _: StreamExecDataStreamScan | _: StreamExecLegacyTableSourceScan | + _: StreamExecTableSourceScan => // append minibatch node if the mode is not NONE and reach a source leaf node mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime case _: StreamExecWatermarkAssigner => diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index b2cc256..e4ede00 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -65,40 +65,42 @@ final class TestValuesRuntimeFunctions { static List<String> getRawResults(String tableName) { List<String> result = new ArrayList<>(); - if (globalRawResult.containsKey(tableName)) { - globalRawResult.get(tableName) - .values() - .forEach(result::addAll); - } else { - throw new IllegalArgumentException("Can't find result for the table '" + tableName + "'."); + synchronized (TestValuesTableFactory.class) { + if (globalRawResult.containsKey(tableName)) { + globalRawResult.get(tableName) + .values() + .forEach(result::addAll); + } } return result; } static List<String> getResults(String tableName) { List<String> result = new ArrayList<>(); - if (globalUpsertResult.containsKey(tableName)) { - globalUpsertResult.get(tableName) - .values() - .forEach(map -> result.addAll(map.values())); - } else if (globalRetractResult.containsKey(tableName)) { - globalRetractResult.get(tableName) - .values() - .forEach(result::addAll); - } else if (globalRawResult.containsKey(tableName)) { - getRawResults(tableName).stream() - .map(s -> s.substring(3, s.length() - 1)) // removes the +I(...) wrapper - .forEach(result::add); - } else { - throw new IllegalArgumentException("Can't find result for the table '" + tableName + "'."); + synchronized (TestValuesTableFactory.class) { + if (globalUpsertResult.containsKey(tableName)) { + globalUpsertResult.get(tableName) + .values() + .forEach(map -> result.addAll(map.values())); + } else if (globalRetractResult.containsKey(tableName)) { + globalRetractResult.get(tableName) + .values() + .forEach(result::addAll); + } else if (globalRawResult.containsKey(tableName)) { + getRawResults(tableName).stream() + .map(s -> s.substring(3, s.length() - 1)) // removes the +I(...) wrapper + .forEach(result::add); + } } return result; } static void clearResults() { - globalRawResult.clear(); - globalUpsertResult.clear(); - globalRetractResult.clear(); + synchronized (TestValuesTableFactory.class) { + globalRawResult.clear(); + globalUpsertResult.clear(); + globalRetractResult.clear(); + } } // ------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml index 124fad3..8456d01 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml @@ -16,6 +16,56 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testIntervalJoinWithMiniBatch"> + <Resource name="sql"> + <![CDATA[ + SELECT b, COUNT(a) + FROM ( + SELECT t1.a as a, t1.b as b + FROM + wmTable1 as t1 JOIN wmTable2 as t2 + ON + t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND + t2.rowtime + INTERVAL '10' SECOND + ) + GROUP BY b + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) ++- LogicalProject(b=[$1], a=[$0]) + +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner]) + :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + : +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1]) ++- Exchange(distribution=[hash[b]]) + +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0]) + +- Calc(select=[b, a]) + +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, rowtime]) + : +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + : +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + : +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, rowtime]) + +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> <TestCase name="testMiniBatchOnDataStreamWithRowTime"> <Resource name="sql"> <![CDATA[ @@ -60,9 +110,30 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXP GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3]) +- Exchange(distribution=[hash[b]]) +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0]) + +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[b, a, c]]], fields=[b, a, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testMiniBatchOnlyForDataStream"> + <Resource name="sql"> + <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)]) ++- LogicalProject(b=[$1], a=[$0], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyDataStream1]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3]) ++- Exchange(distribution=[hash[b]]) + +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0]) +- Calc(select=[b, a, c]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> @@ -76,7 +147,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1 FROM ( SELECT t1.a as a, t1.b as b, t1.rowtime as rt FROM - LeftT as t1 JOIN RightT as t2 + wmTable1 as t1 JOIN wmTable2 as t2 ON t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND t2.rowtime + INTERVAL '10' SECOND @@ -90,10 +161,12 @@ LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_EN +- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)]) +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0]) +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + : +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]]) ]]> </Resource> <Resource name="planAfter"> @@ -105,12 +178,14 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3]) +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, rowtime]) - : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + : +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + : +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, rowtime]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime]) ]]> </Resource> </TestCase> @@ -126,7 +201,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3]) SELECT b, COUNT(a) as a, TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt - FROM LeftT + FROM wmTable1 GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND) ) as t1 JOIN @@ -134,7 +209,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2, w$end AS EXPR$3]) SELECT b, COUNT(a) as a, HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt - FROM RightT + FROM wmTable2 GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) ) as t2 ON @@ -150,13 +225,15 @@ LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)]) : +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)]) : +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0]) - : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + : +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)]) +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)]) +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable2]]) ]]> </Resource> <Resource name="planAfter"> @@ -171,15 +248,17 @@ Calc(select=[b, w0$o0 AS $1]) : +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) : +- Exchange(distribution=[hash[b]]) : +- Calc(select=[b, rowtime, a]) - : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + : +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + : +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[b, a, w$rowtime AS rt]) +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS a, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, rowtime, a]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable2]], fields=[a, b, c, rowtime]) ]]> </Resource> </TestCase> @@ -195,7 +274,7 @@ Calc(select=[b, w0$o0 AS $1]) FROM ( SELECT b, COUNT(a) as a, HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt - FROM RightT + FROM wmTable1 GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) ) GROUP BY b @@ -215,8 +294,9 @@ LogicalProject(a=[$0], b=[$1]) +- LogicalProject(b=[$0], a=[$2]) +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)]) +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) ]]> </Resource> <Resource name="planAfter"> @@ -228,9 +308,8 @@ Calc(select=[a, b]) : +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b]) : +- Exchange(distribution=[hash[a]]) : +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0]) - : +- Calc(select=[a, b]) - : +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime]) - : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + : +- MiniBatchAssigner(interval=[6000ms], mode=[ProcTime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[a, b]]], fields=[a, b]) +- Exchange(distribution=[hash[a]]) +- GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count1$0) AS a]) +- Exchange(distribution=[hash[b]]) @@ -238,8 +317,9 @@ Calc(select=[a, b]) +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS a]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, rowtime, a]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) ]]> </Resource> </TestCase> @@ -254,9 +334,9 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fie +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) + :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1]) : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b]) @@ -265,9 +345,9 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fie +- LogicalProject($f0=[$TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) + :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1]) : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, b]) @@ -277,19 +357,19 @@ LogicalLegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fie +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) + :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1]) : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) == Optimized Logical Plan == IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods], reuse_id=[1]) :- Exchange(distribution=[hash[id1]]) -: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) -: +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text]) +: +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) +: +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text]) +- Exchange(distribution=[hash[id2]]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods]) Exchange(distribution=[hash[id1]], reuse_id=[2]) +- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3]) @@ -317,45 +397,49 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, == Physical Execution Plan == : Data Source - content : Source: Collection Source + content : Source: TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text]) + + : Operator + content : WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + ship_strategy : FORWARD : Data Source - content : Source: Collection Source + content : Source: TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods]) : Operator - content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, rowtime, text]) + content : WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) ship_strategy : FORWARD : Operator - content : WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL MILLISECOND)]) - ship_strategy : FORWARD + content : IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods]) + ship_strategy : HASH : Operator - content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, rowtime, cnt, name, goods]) + content : Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3]) ship_strategy : FORWARD : Operator - content : WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 0:INTERVAL MILLISECOND)]) - ship_strategy : FORWARD + content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) + ship_strategy : HASH : Operator - content : IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods]) - ship_strategy : HASH + content : Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3]) + ship_strategy : FORWARD : Operator - content : Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3]) - ship_strategy : FORWARD + content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) + ship_strategy : HASH : Operator - content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) - ship_strategy : HASH + content : SinkConversionToRow + ship_strategy : FORWARD : Operator - content : Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3]) + content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3]) ship_strategy : FORWARD : Operator - content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) + content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) ship_strategy : HASH : Operator @@ -363,49 +447,37 @@ LegacySink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, ship_strategy : FORWARD : Operator - content : Calc(select=[rowtime, id1, text, _UTF-16LE'-' AS $f3]) - ship_strategy : FORWARD + content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, $f3) AS text]) + ship_strategy : HASH : Operator - content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) - ship_strategy : HASH + content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0]) + ship_strategy : FORWARD : Operator - content : SinkConversionToRow - ship_strategy : FORWARD + content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1]) + ship_strategy : HASH : Operator - content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], select=[id1, LISTAGG(text, $f3) AS text]) - ship_strategy : HASH + content : SinkConversionToTuple2 + ship_strategy : FORWARD : Operator - content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0]) + content : Map ship_strategy : FORWARD - : Operator - content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1]) - ship_strategy : HASH + : Data Sink + content : Sink: TestingAppendTableSink + ship_strategy : FORWARD - : Operator - content : SinkConversionToTuple2 + : Data Sink + content : Sink: TestingAppendTableSink ship_strategy : FORWARD - : Operator - content : Map + : Data Sink + content : Sink: TestingRetractTableSink ship_strategy : FORWARD - : Data Sink - content : Sink: TestingAppendTableSink - ship_strategy : FORWARD - - : Data Sink - content : Sink: TestingAppendTableSink - ship_strategy : FORWARD - - : Data Sink - content : Sink: TestingRetractTableSink - ship_strategy : FORWARD - ]]> </Resource> </TestCase> @@ -442,30 +514,6 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1]) ]]> </Resource> </TestCase> - <TestCase name="testRedundantWatermarkDefinition"> - <Resource name="sql"> - <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)]) -+- LogicalProject(b=[$1], a=[$0], c=[$2]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3]) -+- Exchange(distribution=[hash[b]]) - +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0]) - +- Calc(select=[b, a, c]) - +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) -]]> - </Resource> - </TestCase> <TestCase name="testRowtimeRowsOverWithMiniBatch"> <Resource name="sql"> <![CDATA[ @@ -473,7 +521,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1 FROM ( SELECT c, COUNT(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt - FROM MyTable3 + FROM wmTable1 ) GROUP BY cnt ]]> @@ -482,8 +530,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1 <![CDATA[ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) +- LogicalProject(cnt=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)], c=[$2]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) ]]> </Resource> <Resource name="planAfter"> @@ -496,8 +545,35 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT(count$0) AS EXPR$1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime]) +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testRedundantWatermarkDefinition"> + <Resource name="sql"> + <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[SUM($2)]) ++- LogicalProject(b=[$1], a=[$0], c=[$2]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3]) ++- Exchange(distribution=[hash[b]]) + +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0]) + +- Calc(select=[b, a, c]) + +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) ]]> </Resource> </TestCase> @@ -521,7 +597,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) +- LogicalFilter(condition=[=($1, $6)]) +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{4}]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyDataStream1]]) +- LogicalTableFunctionScan(invocation=[Rates($cor0.rowtime)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(PROCTIME) proctime, TIME ATTRIBUTE(ROWTIME) rowtime)], elementType=[class [Ljava.lang.Object;]) ]]> </Resource> @@ -536,58 +612,12 @@ GlobalGroupAggregate(groupBy=[r_a], select=[r_a, COUNT(count$0) AS EXPR$1]) : +- Calc(select=[a, b, rowtime]) : +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + : +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream1]], fields=[a, b, c, proctime, rowtime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[a, b, rowtime]) +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime]) -]]> - </Resource> - </TestCase> - <TestCase name="testIntervalJoinWithMiniBatch"> - <Resource name="sql"> - <![CDATA[ - SELECT b, COUNT(a) - FROM ( - SELECT t1.a as a, t1.b as b - FROM - LeftT as t1 JOIN RightT as t2 - ON - t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND - t2.rowtime + INTERVAL '10' SECOND - ) - GROUP BY b - ]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) -+- LogicalProject(b=[$1], a=[$0]) - +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1]) -+- Exchange(distribution=[hash[b]]) - +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0]) - +- Calc(select=[b, a]) - +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-5000, leftUpperBound=10000, leftTimeIndex=2, rightTimeIndex=1], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 5000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 10000:INTERVAL SECOND)))], select=[a, b, rowtime, a0, rowtime0]) - :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, b, rowtime]) - : +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) - : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - : +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) - +- Exchange(distribution=[hash[a]]) - +- Calc(select=[a, rowtime]) - +- MiniBatchAssigner(interval=[1000ms], mode=[RowTime]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime]) + +- DataStreamScan(table=[[default_catalog, default_database, MyDataStream2]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> @@ -600,7 +630,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1]) SELECT b, COUNT(a) as cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt - FROM MyTable3 + FROM wmTable1 GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND) ) GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND) @@ -613,8 +643,9 @@ LogicalProject(b=[$0], EXPR$1=[$2]) +- LogicalProject(b=[$0], $f1=[$TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2]) +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)]) +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) ]]> </Resource> <Resource name="planAfter"> @@ -625,8 +656,9 @@ GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)], +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, rowtime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, COUNT(a) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, rowtime, a]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) ]]> </Resource> </TestCase> @@ -639,7 +671,7 @@ GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, $f1, 5000)], COUNT(a) as cnt, HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start, HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end - FROM MyTable3 + FROM wmTable1 GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) ) GROUP BY b @@ -651,8 +683,9 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)]) +- LogicalProject(b=[$0], cnt=[$2]) +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)]) +- LogicalProject(b=[$1], $f1=[HOP($4, 5000:INTERVAL SECOND, 6000:INTERVAL SECOND)], a=[$0]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[$4]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()], rowtime=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, wmTable1]]) ]]> </Resource> <Resource name="planAfter"> @@ -663,8 +696,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, $SUM0_RETRACT(sum$0) AS EXPR$1]) +- GroupWindowAggregate(groupBy=[b], window=[SlidingGroupWindow('w$, rowtime, 6000, 5000)], select=[b, COUNT(a) AS cnt], emit=[early delay 500 millisecond]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, rowtime, a]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 0:INTERVAL MILLISECOND)]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a, b, c, proctime, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) + +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, wmTable1]], fields=[a, b, c, rowtime]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala index 26fabc1..fe3d08a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala @@ -39,9 +39,37 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Before def setup(): Unit = { util.addDataStream[(Int, String, Long)]( - "MyTable1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + "MyDataStream1", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) util.addDataStream[(Int, String, Long)]( - "MyTable2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + "MyDataStream2", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) + + // register tables using DDL + util.addTable( + s""" + |CREATE TABLE MyTable1 ( + | `a` INT, + | `b` STRING, + | `c` BIGINT, + | proctime AS PROCTIME(), + | rowtime TIMESTAMP(3) + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + util.addTable( + s""" + |CREATE TABLE wmTable1 ( + | WATERMARK FOR rowtime AS rowtime + |) LIKE MyTable1 (INCLUDING ALL) + |""".stripMargin) + util.addTable( + s""" + |CREATE TABLE wmTable2 ( + | WATERMARK FOR rowtime AS rowtime + |) LIKE MyTable1 (INCLUDING ALL) + |""".stripMargin) + + // enable mini-batch util.tableEnv.getConfig.getConfiguration.setBoolean( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) } @@ -49,17 +77,24 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testMiniBatchOnly(): Unit = { util.tableEnv.getConfig.getConfiguration - .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") + .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable1 GROUP BY b" util.verifyPlan(sql) } @Test + def testMiniBatchOnlyForDataStream(): Unit = { + util.tableEnv.getConfig.getConfiguration + .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") + val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyDataStream1 GROUP BY b" + util.verifyPlan(sql) + } + + @Test def testRedundantWatermarkDefinition(): Unit = { util.tableEnv.getConfig.getConfiguration .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") - util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0) - val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable3 GROUP BY b" + val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM wmTable1 GROUP BY b" util.verifyPlan(sql) } @@ -69,7 +104,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { tableConfig.getConfiguration .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") withEarlyFireDelay(tableConfig, Time.milliseconds(500)) - util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0) val sql = """ | SELECT b, SUM(cnt) @@ -78,7 +112,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | COUNT(a) as cnt, | HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start, | HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end - | FROM MyTable3 + | FROM wmTable1 | GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) | ) | GROUP BY b @@ -90,7 +124,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { def testWindowCascade(): Unit = { util.tableEnv.getConfig.getConfiguration .setString(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "3 s") - util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0) val sql = """ | SELECT b, @@ -99,7 +132,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | SELECT b, | COUNT(a) as cnt, | TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) as rt - | FROM MyTable3 + | FROM wmTable1 | GROUP BY b, TUMBLE(rowtime, INTERVAL '10' SECOND) | ) | GROUP BY b, TUMBLE(rt, INTERVAL '5' SECOND) @@ -109,8 +142,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testIntervalJoinWithMiniBatch(): Unit = { - util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0) - util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") @@ -120,7 +151,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | FROM ( | SELECT t1.a as a, t1.b as b | FROM - | LeftT as t1 JOIN RightT as t2 + | wmTable1 as t1 JOIN wmTable2 as t2 | ON | t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND | t2.rowtime + INTERVAL '10' SECOND @@ -132,7 +163,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testRowtimeRowsOverWithMiniBatch(): Unit = { - util.addTableWithWatermark("MyTable3", util.tableEnv.from("MyTable1"), "rowtime", 0) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") @@ -142,7 +172,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | FROM ( | SELECT c, COUNT(a) | OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND CURRENT ROW) as cnt - | FROM MyTable3 + | FROM wmTable1 | ) | GROUP BY cnt """.stripMargin @@ -152,8 +182,8 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testTemporalTableFunctionJoinWithMiniBatch(): Unit = { - util.addTableWithWatermark("Orders", util.tableEnv.from("MyTable1"), "rowtime", 0) - util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyTable2"), "rowtime", 0) + util.addTableWithWatermark("Orders", util.tableEnv.from("MyDataStream1"), "rowtime", 0) + util.addTableWithWatermark("RatesHistory", util.tableEnv.from("MyDataStream2"), "rowtime", 0) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") @@ -180,8 +210,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testMultiOperatorNeedsWatermark1(): Unit = { // infer result: miniBatchInterval=[Rowtime, 0ms] - util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0) - util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "1 s") @@ -194,7 +222,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | FROM ( | SELECT t1.a as a, t1.b as b, t1.rowtime as rt | FROM - | LeftT as t1 JOIN RightT as t2 + | wmTable1 as t1 JOIN wmTable2 as t2 | ON | t1.a = t2.a AND t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND | t2.rowtime + INTERVAL '10' SECOND @@ -206,8 +234,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testMultiOperatorNeedsWatermark2(): Unit = { - util.addTableWithWatermark("LeftT", util.tableEnv.from("MyTable1"), "rowtime", 0) - util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s") @@ -222,7 +248,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | SELECT b, | COUNT(a) as a, | TUMBLE_ROWTIME(rowtime, INTERVAL '5' SECOND) as rt - | FROM LeftT + | FROM wmTable1 | GROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND) | ) as t1 | JOIN @@ -230,7 +256,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | SELECT b, | COUNT(a) as a, | HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt - | FROM RightT + | FROM wmTable2 | GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) | ) as t2 | ON @@ -243,7 +269,6 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testMultiOperatorNeedsWatermark3(): Unit = { - util.addTableWithWatermark("RightT", util.tableEnv.from("MyTable2"), "rowtime", 0) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "6 s") @@ -258,7 +283,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { | FROM ( | SELECT b, COUNT(a) as a, | HOP_ROWTIME(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as rt - | FROM RightT + | FROM wmTable1 | GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) | ) | GROUP BY b @@ -274,12 +299,30 @@ class MiniBatchIntervalInferTest extends TableTestBase { @Test def testMultipleWindowAggregates(): Unit = { val stmtSet = util.tableEnv.createStatementSet() - util.addDataStream[(Int, Long, String)]("T1", 'id1, 'rowtime.rowtime, 'text) - util.addDataStream[(Int, Long, Int, String, String)]( - "T2", - 'id2, 'rowtime.rowtime, 'cnt, 'name, 'goods) - util.addTableWithWatermark("T3", util.tableEnv.from("T1"), "rowtime", 0) - util.addTableWithWatermark("T4", util.tableEnv.from("T2"), "rowtime", 0) + util.addTable( + s""" + |CREATE TABLE T1 ( + | id1 INT, + | rowtime TIMESTAMP(3), + | `text` STRING, + | WATERMARK FOR rowtime AS rowtime + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) + util.addTable( + s""" + |CREATE TABLE T2 ( + | id2 INT, + | rowtime TIMESTAMP(3), + | cnt INT, + | name STRING, + | goods STRING, + | WATERMARK FOR rowtime AS rowtime + |) WITH ( + | 'connector' = 'values' + |) + |""".stripMargin) util.tableEnv.getConfig.getConfiguration.setString( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, "500 ms") @@ -288,13 +331,13 @@ class MiniBatchIntervalInferTest extends TableTestBase { val table1 = util.tableEnv.sqlQuery( """ - |SELECT id1, T3.rowtime AS ts, text - | FROM T3, T4 + |SELECT id1, T1.rowtime AS ts, text + | FROM T1, T2 |WHERE id1 = id2 - | AND T3.rowtime > T4.rowtime - INTERVAL '5' MINUTE - | AND T3.rowtime < T4.rowtime + INTERVAL '3' MINUTE + | AND T1.rowtime > T2.rowtime - INTERVAL '5' MINUTE + | AND T1.rowtime < T2.rowtime + INTERVAL '3' MINUTE """.stripMargin) - util.tableEnv.registerTable("TempTable1", table1) + util.tableEnv.createTemporaryView("TempTable1", table1) val table2 = util.tableEnv.sqlQuery( """ @@ -304,7 +347,7 @@ class MiniBatchIntervalInferTest extends TableTestBase { |FROM TempTable1 |GROUP BY TUMBLE(ts, INTERVAL '6' SECOND), id1 """.stripMargin) - util.tableEnv.registerTable("TempTable2", table2) + util.tableEnv.createTemporaryView("TempTable2", table2) val table3 = util.tableEnv.sqlQuery( """