This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f27e53a [FLINK-24186][table-planner] Allow multiple rowtime attributes for collect() and print() f27e53a is described below commit f27e53a03516ca7de7ec6c86a905f7d8a88b1271 Author: Timo Walther <twal...@apache.org> AuthorDate: Thu Dec 9 13:35:06 2021 +0100 [FLINK-24186][table-planner] Allow multiple rowtime attributes for collect() and print() This closes #17217. --- .../planner/connectors/CollectDynamicSink.java | 2 +- .../plan/nodes/exec/batch/BatchExecSink.java | 3 +- .../plan/nodes/exec/common/CommonExecSink.java | 2 +- .../plan/nodes/exec/stream/StreamExecSink.java | 13 ++++---- .../org/apache/flink/table/api/TableITCase.scala | 35 +++++++++++++++++++++- .../runtime/stream/table/TableSinkITCase.scala | 5 ++-- 6 files changed, 49 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java index 98fcf8b..be59089 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java @@ -49,7 +49,7 @@ import java.util.function.Function; /** Table sink for {@link TableResult#collect()}. */ @Internal -final class CollectDynamicSink implements DynamicTableSink { +public final class CollectDynamicSink implements DynamicTableSink { private final ObjectIdentifier tableIdentifier; private final DataType consumedDataType; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java index 3633628..64a1c0c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java @@ -56,6 +56,7 @@ public class BatchExecSink extends CommonExecSink implements BatchExecNode<Objec protected Transformation<Object> translateToPlanInternal(PlannerBase planner) { final Transformation<RowData> inputTransform = (Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner); - return createSinkTransformation(planner, inputTransform, -1, false); + final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext()); + return createSinkTransformation(planner, inputTransform, tableSink, -1, false); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 9c1870f..65500b9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -117,9 +117,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> protected Transformation<Object> createSinkTransformation( PlannerBase planner, Transformation<RowData> inputTransform, + DynamicTableSink tableSink, int rowtimeFieldIndex, boolean upsertMaterialize) { - final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext()); final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema(); final SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index c145b59..848779c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.connectors.CollectDynamicSink; import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; @@ -113,6 +114,8 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj final Transformation<RowData> inputTransform = (Transformation<RowData>) inputEdge.translateToPlan(planner); final RowType inputRowType = (RowType) inputEdge.getOutputType(); + final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext()); + final boolean isCollectSink = tableSink instanceof CollectDynamicSink; final List<Integer> rowtimeFieldIndices = new ArrayList<>(); for (int i = 0; i < inputRowType.getFieldCount(); ++i) { @@ -121,12 +124,12 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj } } final int rowtimeFieldIndex; - if (rowtimeFieldIndices.size() > 1) { + if (rowtimeFieldIndices.size() > 1 && !isCollectSink) { throw new TableException( String.format( - "Found more than one rowtime field: [%s] in the query when insert into '%s'.\n" - + "Please select the rowtime field that should be used as event-time timestamp " - + "for the DataStream by casting all other fields to TIMESTAMP.", + "The query contains more than one rowtime attribute column [%s] for writing into table '%s'.\n" + + "Please select the column that should be used as the event-time timestamp " + + "for the table sink by casting all other columns to regular TIMESTAMP or TIMESTAMP_LTZ.", rowtimeFieldIndices.stream() .map(i -> inputRowType.getFieldNames().get(i)) .collect(Collectors.joining(", ")), @@ -138,6 +141,6 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj } return createSinkTransformation( - planner, inputTransform, rowtimeFieldIndex, upsertMaterialize); + planner, inputTransform, tableSink, rowtimeFieldIndex, upsertMaterialize); } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala index 637832c..53ace8a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala @@ -24,9 +24,12 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.catalog.{Column, ResolvedSchema} import org.apache.flink.table.planner.utils.TestTableSourceSinks +import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.{CollectionUtil, TestLogger} +import org.hamcrest.MatcherAssert.assertThat +import org.hamcrest.Matchers.containsInAnyOrder import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue} import org.junit.rules.{ExpectedException, TemporaryFolder} import org.junit.runner.RunWith @@ -35,9 +38,10 @@ import org.junit.{Before, Rule, Test} import _root_.java.lang.{Long => JLong} import _root_.java.util +import java.time.Instant @RunWith(classOf[Parameterized]) -class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger { +class TableITCase(tableEnvName: String, isStreaming: Boolean) extends AbstractTestBase { // used for accurate exception information checking. val expectedException: ExpectedException = ExpectedException.none() @@ -165,6 +169,35 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger assertEquals(expected, actual) } + @Test + def testCollectWithMultiRowtime(): Unit = { + tEnv.executeSql( + """ + |CREATE TABLE MyTableWithRowtime1 ( + | ts AS TO_TIMESTAMP_LTZ(id, 3), + | WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE) + |LIKE MyTable""".stripMargin) + tEnv.executeSql( + """ + |CREATE TABLE MyTableWithRowtime2 ( + | ts AS TO_TIMESTAMP_LTZ(id, 3), + | WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE) + |LIKE MyTable""".stripMargin) + + val tableResult = tEnv.executeSql( + """ + |SELECT MyTableWithRowtime1.ts, MyTableWithRowtime2.ts + |FROM MyTableWithRowtime1, MyTableWithRowtime2 + |WHERE + | MyTableWithRowtime1.first = MyTableWithRowtime2.first AND + | MyTableWithRowtime1.ts = MyTableWithRowtime2.ts""".stripMargin) + + val expected = for (i <- 1 to 8) yield + Row.ofKind(RowKind.INSERT, Instant.ofEpochMilli(i), Instant.ofEpochMilli(i)) + + val actual = CollectionUtil.iteratorToList(tableResult.collect()) + assertThat(actual, containsInAnyOrder(expected: _*)) + } } object TableITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala index da20323..fe73d24 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala @@ -430,8 +430,9 @@ class TableSinkITCase extends StreamingTestBase { .select('num, 'w.rowtime as 'rowtime1, 'w.rowtime as 'rowtime2) thrown.expect(classOf[TableException]) - thrown.expectMessage("Found more than one rowtime field: [rowtime1, rowtime2] " + - "in the query when insert into 'default_catalog.default_database.sink'") + thrown.expectMessage( + "The query contains more than one rowtime attribute column [rowtime1, rowtime2] for " + + "writing into table 'default_catalog.default_database.sink'.") table.executeInsert("sink") }