This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 20a328d80a1dbc50974cf3de9f4b6178246f6dee Author: Jim Hughes <jhug...@confluent.io> AuthorDate: Tue Dec 12 14:04:48 2023 -0500 [FLINK-33767] Deleting TemporalJoinJsonPlanTest.java and TemporalJoinJsonPlanITCase.java --- .../exec/stream/TemporalJoinJsonPlanTest.java | 101 ----- .../jsonplan/TemporalJoinJsonPlanITCase.java | 107 ------ .../testJoinTemporalFunction.out | 421 --------------------- .../testTemporalTableJoin.json | 421 --------------------- .../testTemporalTableJoin.out | 421 --------------------- 5 files changed, 1471 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java deleted file mode 100644 index da3e6eaebae..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.functions.TemporalTableFunction; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.apache.flink.table.api.Expressions.$; - -/** Test json serialization/deserialization for TemporalJoin. */ -class TemporalJoinJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - - tEnv.executeSql( - "CREATE TABLE Orders (\n" - + " amount INT,\n" - + " currency STRING,\n" - + " rowtime TIMESTAMP(3),\n" - + " proctime AS PROCTIME(),\n" - + " WATERMARK FOR rowtime AS rowtime\n" - + ") WITH (\n" - + " 'connector' = 'values'\n" - + ")"); - tEnv.executeSql( - "CREATE TABLE RatesHistory (\n" - + " currency STRING,\n" - + " rate INT,\n" - + " rowtime TIMESTAMP(3),\n" - + " WATERMARK FOR rowtime AS rowtime,\n" - + " PRIMARY KEY(currency) NOT ENFORCED\n" - + ") WITH (\n" - + " 'connector' = 'values'\n" - + ")"); - TemporalTableFunction ratesHistory = - tEnv.from("RatesHistory").createTemporalTableFunction($("rowtime"), $("currency")); - tEnv.createTemporarySystemFunction("Rates", ratesHistory); - } - - @Test - void testJoinTemporalFunction() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "INSERT INTO MySink " - + "SELECT amount * r.rate " - + "FROM Orders AS o, " - + "LATERAL TABLE (Rates(o.rowtime)) AS r " - + "WHERE o.currency = r.currency "); - } - - @Test - void testTemporalTableJoin() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a int\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "INSERT INTO MySink " - + "SELECT amount * r.rate " - + "FROM Orders AS o " - + "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " - + "ON o.currency = r.currency "); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java deleted file mode 100644 index ca1ed35f050..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.functions.TemporalTableFunction; -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; -import org.apache.flink.types.Row; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; - -import static org.apache.flink.table.api.Expressions.$; - -/** Test for TemporalJoin json plan. */ -class TemporalJoinJsonPlanITCase extends JsonPlanTestBase { - - @BeforeEach - @Override - protected void setup() throws Exception { - super.setup(); - List<Row> orders = - Arrays.asList( - Row.of(2L, "Euro", 2L), - Row.of(1L, "US Dollar", 3L), - Row.of(50L, "Yen", 4L), - Row.of(3L, "Euro", 5L)); - createTestValuesSourceTable( - "Orders", - orders, - "amount bigint", - "currency STRING", - "order_time bigint", - "rowtime as TO_TIMESTAMP(FROM_UNIXTIME(order_time)) ", - "proctime as PROCTIME()", - "WATERMARK FOR rowtime AS rowtime"); - List<Row> ratesHistory = - Arrays.asList( - Row.of("US Dollar", 102L, 1L), - Row.of("Euro", 114L, 1L), - Row.of("Yen", 1L, 1L), - Row.of("Euro", 116L, 5L), - Row.of("Euro", 119L, 7L)); - createTestValuesSourceTable( - "RatesHistory", - ratesHistory, - "currency STRING", - "rate bigint", - "rate_time bigint", - "rowtime as TO_TIMESTAMP(FROM_UNIXTIME(rate_time)) ", - "proctime as PROCTIME()", - "WATERMARK FOR rowtime AS rowtime", - "PRIMARY KEY(currency) NOT ENFORCED"); - - TemporalTableFunction temporalTableFunction = - tableEnv.from("RatesHistory") - .createTemporalTableFunction($("rowtime"), $("currency")); - tableEnv.createTemporarySystemFunction("Rates", temporalTableFunction); - createTestValuesSinkTable("MySink", "amount bigint"); - } - - /** test process time inner join. * */ - @Test - void testJoinTemporalFunction() throws Exception { - compileSqlAndExecutePlan( - "INSERT INTO MySink " - + "SELECT amount * r.rate " - + "FROM Orders AS o, " - + "LATERAL TABLE (Rates(o.rowtime)) AS r " - + "WHERE o.currency = r.currency ") - .await(); - List<String> expected = Arrays.asList("+I[102]", "+I[228]", "+I[348]", "+I[50]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testTemporalTableJoin() throws Exception { - compileSqlAndExecutePlan( - "INSERT INTO MySink " - + "SELECT amount * r.rate " - + "FROM Orders AS o " - + "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " - + "ON o.currency = r.currency ") - .await(); - List<String> expected = Arrays.asList("+I[102]", "+I[228]", "+I[348]", "+I[50]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out deleted file mode 100644 index 9122979c553..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out +++ /dev/null @@ -1,421 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`Orders`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "amount", - "dataType" : "INT" - }, { - "name" : "currency", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime`" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, rowtime])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[currency]])" - }, { - "id" : 4, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "currency", - "dataType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "dataType" : "INT" - }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime`" - } - } ], - "primaryKey" : { - "name" : "PK_currency", - "type" : "PRIMARY_KEY", - "columns" : [ "currency" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, `rowtime` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])", - "inputProperties" : [ ] - }, { - "id" : 5, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" - }, { - "id" : 6, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[currency]])" - }, { - "id" : 7, - "type" : "stream-exec-temporal-join_1", - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 1 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "isTemporalFunctionJoin" : true, - "leftTimeAttributeIndex" : 2, - "rightTimeAttributeIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "currency0", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime0", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "TemporalJoin(joinType=[InnerJoin], where=[(__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0)) AND (currency = currency0))], select=[amount, currency, rowtime, currency0, rate, rowtime0])" - }, { - "id" : 8, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$*$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "INT" - } ], - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`EXPR$0` INT>", - "description" : "Calc(select=[(amount * rate) AS EXPR$0])" - }, { - "id" : 9, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`EXPR$0` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json deleted file mode 100644 index bbfd1524d3b..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json +++ /dev/null @@ -1,421 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`Orders`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "amount", - "dataType" : "INT" - }, { - "name" : "currency", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime`" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, rowtime])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[currency]])" - }, { - "id" : 4, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "currency", - "dataType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "dataType" : "INT" - }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime`" - } - } ], - "primaryKey" : { - "name" : "PK_currency", - "type" : "PRIMARY_KEY", - "columns" : [ "currency" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, `rowtime` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])", - "inputProperties" : [ ] - }, { - "id" : 5, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" - }, { - "id" : 6, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[currency]])" - }, { - "id" : 7, - "type" : "stream-exec-temporal-join_1", - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 1 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "isTemporalFunctionJoin" : false, - "leftTimeAttributeIndex" : 2, - "rightTimeAttributeIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "currency0", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime0", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), __TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], select=[amount, currency, rowtime, currency0, rate, rowtime0])" - }, { - "id" : 8, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$*$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "INT" - } ], - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`EXPR$0` INT>", - "description" : "Calc(select=[(amount * rate) AS EXPR$0])" - }, { - "id" : 9, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`EXPR$0` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out deleted file mode 100644 index bbfd1524d3b..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out +++ /dev/null @@ -1,421 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`Orders`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "amount", - "dataType" : "INT" - }, { - "name" : "currency", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime`" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[amount, currency, rowtime])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[currency]])" - }, { - "id" : 4, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`RatesHistory`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "currency", - "dataType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "dataType" : "INT" - }, { - "name" : "rowtime", - "dataType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime`" - } - } ], - "primaryKey" : { - "name" : "PK_currency", - "type" : "PRIMARY_KEY", - "columns" : [ "currency" ] - } - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, `rowtime` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime])", - "inputProperties" : [ ] - }, { - "id" : 5, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])" - }, { - "id" : 6, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[currency]])" - }, { - "id" : 7, - "type" : "stream-exec-temporal-join_1", - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 1 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "isTemporalFunctionJoin" : false, - "leftTimeAttributeIndex" : 2, - "rightTimeAttributeIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "amount", - "fieldType" : "INT" - }, { - "name" : "currency", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "currency0", - "fieldType" : "VARCHAR(2147483647) NOT NULL" - }, { - "name" : "rate", - "fieldType" : "INT" - }, { - "name" : "rowtime0", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), __TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], select=[amount, currency, rowtime, currency0, rate, rowtime0])" - }, { - "id" : 8, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$*$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "INT" - } ], - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`EXPR$0` INT>", - "description" : "Calc(select=[(amount * rate) AS EXPR$0])" - }, { - "id" : 9, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`EXPR$0` INT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[EXPR$0])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}