This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 881062f352f [FLINK-34005] Implement restore tests for MiniBatchAssigner node 881062f352f is described below commit 881062f352f8bf8c21ab7cbea95e111fd82fdf20 Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Fri Jan 5 11:46:26 2024 -0800 [FLINK-34005] Implement restore tests for MiniBatchAssigner node --- .../exec/stream/MiniBatchAssignerRestoreTest.java | 40 + .../exec/stream/MiniBatchAssignerTestPrograms.java | 144 ++++ .../plan/mini-batch-assigner-proc-time.json | 257 +++++++ .../savepoint/_metadata | Bin 0 -> 13431 bytes .../plan/mini-batch-assigner-row-time.json | 854 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 24113 bytes 6 files changed, 1295 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java new file mode 100644 index 00000000000..213c96fd6eb --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecMiniBatchAssigner}. */ +public class MiniBatchAssignerRestoreTest extends RestoreTestBase { + + public MiniBatchAssignerRestoreTest() { + super(StreamExecMiniBatchAssigner.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + MiniBatchAssignerTestPrograms.MINI_BATCH_ASSIGNER_ROW_TIME, + MiniBatchAssignerTestPrograms.MINI_BATCH_ASSIGNER_PROC_TIME); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java new file mode 100644 index 00000000000..c2c701f6d1c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.time.Duration; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecMiniBatchAssigner}. */ +public class MiniBatchAssignerTestPrograms { + + static final String[] ROW_TIME_SCHEMA = { + "ts STRING", + "id STRING", + "num INT", + "name STRING", + "row_time AS TO_TIMESTAMP(`ts`)", + "WATERMARK for `row_time` AS `row_time` - INTERVAL '1' SECOND" + }; + + static final TableTestProgram MINI_BATCH_ASSIGNER_ROW_TIME = + TableTestProgram.of( + "mini-batch-assigner-row-time", + "validates mini batch assigner with row time") + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(1)) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) + .setupTableSource( + SourceTestStep.newBuilder("source_one_t") + .addSchema(ROW_TIME_SCHEMA) + .producedBeforeRestore( + Row.of("2020-10-10 00:00:01", "L1", 1, "a"), + Row.of("2020-10-10 00:00:02", "L2", 2, "c"), + Row.of("2020-10-10 00:00:03", "L3", 2, "x")) + .producedAfterRestore( + Row.of("2020-10-10 00:00:41", "L41", 10, "a"), + Row.of("2020-10-10 00:00:42", "L42", 11, "c")) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("source_two_t") + .addSchema(ROW_TIME_SCHEMA) + .producedBeforeRestore( + Row.of("2020-10-10 00:00:01", "R1", 5, "a"), + Row.of("2020-10-10 00:00:02", "R2", 7, "b"), + Row.of("2020-10-10 00:00:03", "R3", 7, "f")) + .producedAfterRestore( + Row.of("2020-10-10 00:00:41", "R41", 10, "y"), + Row.of("2020-10-10 00:00:42", "R42", 11, "c")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "window_start TIMESTAMP(3)", + "window_end TIMESTAMP(3)", + "name STRING", + "L_id STRING", + "L_num INT", + "R_id STRING", + "R_num INT") + .consumedBeforeRestore( + "+I[2020-10-10T00:00:01, 2020-10-10T00:00:02, a, L1, 1, R1, 5]") + .consumedAfterRestore( + "+I[2020-10-10T00:00:42, 2020-10-10T00:00:43, c, L42, 11, R42, 11]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT\n" + + "L.window_start AS window_start,\n" + + "L.window_end AS window_end,\n" + + "L.name AS name,\n" + + "L.id AS L_id,\n" + + "L.num AS L_num,\n" + + "R.id AS R_id,\n" + + "R.num AS R_num\n" + + "FROM\n" + + "(\n" + + " SELECT * FROM TABLE(TUMBLE(TABLE source_one_t, DESCRIPTOR(row_time), INTERVAL '1' SECOND))\n" + + ") L\n" + + "JOIN\n" + + "(\n" + + " SELECT * FROM TABLE(TUMBLE(TABLE source_two_t, DESCRIPTOR(row_time), INTERVAL '1' SECOND))\n" + + ") R\n" + + "ON L.name = R.name\n" + + "AND L.window_start = R.window_start\n" + + "AND L.window_end = R.window_end") + .build(); + + static final TableTestProgram MINI_BATCH_ASSIGNER_PROC_TIME = + TableTestProgram.of( + "mini-batch-assigner-proc-time", + "validates mini batch assigner with proc time") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("a INT", "b BIGINT", "c VARCHAR") + .producedBeforeRestore( + Row.of(1, 1L, "hi"), + Row.of(2, 2L, "hello"), + Row.of(3, 2L, "hello world")) + .producedAfterRestore( + Row.of(3, 2L, "foo"), + Row.of(4, 4L, "bar"), + Row.of(5, 2L, "foo bar")) + .build()) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(1)) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("b BIGINT", "a BIGINT") + .consumedBeforeRestore("+I[1, 1]", "+I[2, 2]") + .consumedAfterRestore("-U[2, 2]", "+U[2, 3]", "+I[4, 1]") + .build()) + .runSql( + "INSERT INTO sink_t\n" + + " SELECT\n" + + " b,\n" + + " COUNT(DISTINCT a) AS a\n" + + " FROM source_t\n" + + " GROUP BY b") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json new file mode 100644 index 00000000000..8210b80a0b2 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json @@ -0,0 +1,257 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 18, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 1 ], [ 0 ] ], + "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL" + } ] + }, + "outputType" : "ROW<`b` BIGINT, `a` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])", + "inputProperties" : [ ] + }, { + "id" : 19, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 1000, + "mode" : "ProcTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `a` INT>", + "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])" + }, { + "id" : 20, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "a", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "count$0", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<INT, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS count$0, DISTINCT(a) AS distinct$0])" + }, { + "id" : 21, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "count$0", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<INT, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "Exchange(distribution=[hash[b]])" + }, { + "id" : 22, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "a", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "localAggInputRowType" : "ROW<`b` BIGINT, `a` INT>", + "generateUpdateBefore" : true, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `a` BIGINT NOT NULL>", + "description" : "GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS a])" + }, { + "id" : 23, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "a", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `a` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, a])" + } ], + "edges" : [ { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 21, + "target" : 22, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 22, + "target" : 23, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata new file mode 100644 index 00000000000..1afe7af6919 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json new file mode 100644 index 00000000000..7870d46a833 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json @@ -0,0 +1,854 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_one_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "row_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`row_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_one_t]], fields=[ts, id, num, name])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `row_time` TIMESTAMP(3)>", + "description" : "Calc(select=[id, num, name, TO_TIMESTAMP(ts) AS row_time])" + }, { + "id" : 3, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "fieldType" : "INT" + }, { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 4, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 1000, + "mode" : "RowTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "fieldType" : "INT" + }, { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "MiniBatchAssigner(interval=[1000ms], mode=[RowTime])" + }, { + "id" : 5, + "type" : "stream-exec-window-table-function_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT1S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 3, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "fieldType" : "INT" + }, { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[1 s])])" + }, { + "id" : 6, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Calc(select=[id, num, name, window_start, window_end])" + }, { + "id" : 7, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 2 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 8, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_two_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "ts", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "dataType" : "INT" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "TO_TIMESTAMP(`ts`)" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "row_time", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "serializableString" : "`row_time` - INTERVAL '1' SECOND" + } + } ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`ts` VARCHAR(2147483647), `id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_two_t]], fields=[ts, id, num, name])", + "inputProperties" : [ ] + }, { + "id" : 9, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "CALL", + "internalName" : "$TO_TIMESTAMP$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "TIMESTAMP(3)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `row_time` TIMESTAMP(3)>", + "description" : "Calc(select=[id, num, name, TO_TIMESTAMP(ts) AS row_time])" + }, { + "id" : 10, + "type" : "stream-exec-watermark-assigner_1", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3)" + }, { + "kind" : "LITERAL", + "value" : "1000", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3)" + }, + "rowtimeFieldIndex" : 3, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "fieldType" : "INT" + }, { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 1000:INTERVAL SECOND)])" + }, { + "id" : 11, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 1000, + "mode" : "RowTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "fieldType" : "INT" + }, { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "MiniBatchAssigner(interval=[1000ms], mode=[RowTime])" + }, { + "id" : 12, + "type" : "stream-exec-window-table-function_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "windowing" : { + "strategy" : "TimeAttribute", + "window" : { + "type" : "TumblingWindow", + "size" : "PT1S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "timeAttributeIndex" : 3, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "id", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "num", + "fieldType" : "INT" + }, { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "row_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + }, { + "name" : "window_start", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_end", + "fieldType" : "TIMESTAMP(3) NOT NULL" + }, { + "name" : "window_time", + "fieldType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "WindowTableFunction(window=[TUMBLE(time_col=[row_time], size=[1 s])])" + }, { + "id" : 13, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "TIMESTAMP(3) NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Calc(select=[id, num, name, window_start, window_end])" + }, { + "id" : 14, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 2 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL>", + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 15, + "type" : "stream-exec-window-join_1", + "configuration" : { + "table.local-time-zone" : "default" + }, + "joinSpec" : { + "joinType" : "INNER", + "leftKeys" : [ 2 ], + "rightKeys" : [ 2 ], + "filterNulls" : [ true ], + "nonEquiCondition" : null + }, + "leftWindowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT1S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 3, + "windowEnd" : 4, + "isRowtime" : true + }, + "rightWindowing" : { + "strategy" : "WindowAttached", + "window" : { + "type" : "TumblingWindow", + "size" : "PT1S" + }, + "timeAttributeType" : { + "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + }, + "windowStart" : 3, + "windowEnd" : 4, + "isRowtime" : true + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `id0` VARCHAR(2147483647), `num0` INT, `name0` VARCHAR(2147483647), `window_start0` TIMESTAMP(3) NOT NULL, `window_end0` TIMESTAMP(3) NOT NULL>", + "description" : "WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 s])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 s])], joinType=[InnerJoin], where=[(name = name0)], select=[id, num, name, window_start, window_end, id0, num0, name0, window_start0, window_end0])" + }, { + "id" : 16, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 4, + "type" : "TIMESTAMP(3) NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 5, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 6, + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `name` VARCHAR(2147483647), `L_id` VARCHAR(2147483647), `L_num` INT, `R_id` VARCHAR(2147483647), `R_num` INT>", + "description" : "Calc(select=[window_start, window_end, name, id AS L_id, num AS L_num, id0 AS R_id, num0 AS R_num])" + }, { + "id" : 17, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "window_start", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "window_end", + "dataType" : "TIMESTAMP(3)" + }, { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "L_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "L_num", + "dataType" : "INT" + }, { + "name" : "R_id", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "R_num", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` TIMESTAMP(3) NOT NULL, `name` VARCHAR(2147483647), `L_id` VARCHAR(2147483647), `L_num` INT, `R_id` VARCHAR(2147483647), `R_num` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[window_start, window_end, name, L_id, L_num, R_id, R_num])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata new file mode 100644 index 00000000000..804773b2a4a Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata differ