This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f27e53a  [FLINK-24186][table-planner] Allow multiple rowtime 
attributes for collect() and print()
f27e53a is described below

commit f27e53a03516ca7de7ec6c86a905f7d8a88b1271
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu Dec 9 13:35:06 2021 +0100

    [FLINK-24186][table-planner] Allow multiple rowtime attributes for 
collect() and print()
    
    This closes #17217.
---
 .../planner/connectors/CollectDynamicSink.java     |  2 +-
 .../plan/nodes/exec/batch/BatchExecSink.java       |  3 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  2 +-
 .../plan/nodes/exec/stream/StreamExecSink.java     | 13 ++++----
 .../org/apache/flink/table/api/TableITCase.scala   | 35 +++++++++++++++++++++-
 .../runtime/stream/table/TableSinkITCase.scala     |  5 ++--
 6 files changed, 49 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 98fcf8b..be59089 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -49,7 +49,7 @@ import java.util.function.Function;
 
 /** Table sink for {@link TableResult#collect()}. */
 @Internal
-final class CollectDynamicSink implements DynamicTableSink {
+public final class CollectDynamicSink implements DynamicTableSink {
 
     private final ObjectIdentifier tableIdentifier;
     private final DataType consumedDataType;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index 3633628..64a1c0c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -56,6 +56,7 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
     protected Transformation<Object> translateToPlanInternal(PlannerBase 
planner) {
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) 
getInputEdges().get(0).translateToPlan(planner);
-        return createSinkTransformation(planner, inputTransform, -1, false);
+        final DynamicTableSink tableSink = 
tableSinkSpec.getTableSink(planner.getFlinkContext());
+        return createSinkTransformation(planner, inputTransform, tableSink, 
-1, false);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 9c1870f..65500b9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -117,9 +117,9 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     protected Transformation<Object> createSinkTransformation(
             PlannerBase planner,
             Transformation<RowData> inputTransform,
+            DynamicTableSink tableSink,
             int rowtimeFieldIndex,
             boolean upsertMaterialize) {
-        final DynamicTableSink tableSink = 
tableSinkSpec.getTableSink(planner.getFlinkContext());
         final ResolvedSchema schema = 
tableSinkSpec.getCatalogTable().getResolvedSchema();
         final SinkRuntimeProvider runtimeProvider =
                 tableSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(isBounded));
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index c145b59..848779c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.connectors.CollectDynamicSink;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -113,6 +114,8 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
+        final DynamicTableSink tableSink = 
tableSinkSpec.getTableSink(planner.getFlinkContext());
+        final boolean isCollectSink = tableSink instanceof CollectDynamicSink;
 
         final List<Integer> rowtimeFieldIndices = new ArrayList<>();
         for (int i = 0; i < inputRowType.getFieldCount(); ++i) {
@@ -121,12 +124,12 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             }
         }
         final int rowtimeFieldIndex;
-        if (rowtimeFieldIndices.size() > 1) {
+        if (rowtimeFieldIndices.size() > 1 && !isCollectSink) {
             throw new TableException(
                     String.format(
-                            "Found more than one rowtime field: [%s] in the 
query when insert into '%s'.\n"
-                                    + "Please select the rowtime field that 
should be used as event-time timestamp "
-                                    + "for the DataStream by casting all other 
fields to TIMESTAMP.",
+                            "The query contains more than one rowtime 
attribute column [%s] for writing into table '%s'.\n"
+                                    + "Please select the column that should be 
used as the event-time timestamp "
+                                    + "for the table sink by casting all other 
columns to regular TIMESTAMP or TIMESTAMP_LTZ.",
                             rowtimeFieldIndices.stream()
                                     .map(i -> 
inputRowType.getFieldNames().get(i))
                                     .collect(Collectors.joining(", ")),
@@ -138,6 +141,6 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
         }
 
         return createSinkTransformation(
-                planner, inputTransform, rowtimeFieldIndex, upsertMaterialize);
+                planner, inputTransform, tableSink, rowtimeFieldIndex, 
upsertMaterialize);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
index 637832c..53ace8a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala
@@ -24,9 +24,12 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.catalog.{Column, ResolvedSchema}
 import org.apache.flink.table.planner.utils.TestTableSourceSinks
+import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.{Row, RowKind}
 import org.apache.flink.util.{CollectionUtil, TestLogger}
 
+import org.hamcrest.MatcherAssert.assertThat
+import org.hamcrest.Matchers.containsInAnyOrder
 import org.junit.Assert.{assertEquals, assertNotEquals, assertTrue}
 import org.junit.rules.{ExpectedException, TemporaryFolder}
 import org.junit.runner.RunWith
@@ -35,9 +38,10 @@ import org.junit.{Before, Rule, Test}
 
 import _root_.java.lang.{Long => JLong}
 import _root_.java.util
+import java.time.Instant
 
 @RunWith(classOf[Parameterized])
-class TableITCase(tableEnvName: String, isStreaming: Boolean) extends 
TestLogger {
+class TableITCase(tableEnvName: String, isStreaming: Boolean) extends 
AbstractTestBase {
 
   // used for accurate exception information checking.
   val expectedException: ExpectedException = ExpectedException.none()
@@ -165,6 +169,35 @@ class TableITCase(tableEnvName: String, isStreaming: 
Boolean) extends TestLogger
     assertEquals(expected, actual)
   }
 
+  @Test
+  def testCollectWithMultiRowtime(): Unit = {
+    tEnv.executeSql(
+      """
+        |CREATE TABLE MyTableWithRowtime1 (
+        |  ts AS TO_TIMESTAMP_LTZ(id, 3),
+        |  WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE)
+        |LIKE MyTable""".stripMargin)
+    tEnv.executeSql(
+      """
+        |CREATE TABLE MyTableWithRowtime2 (
+        |  ts AS TO_TIMESTAMP_LTZ(id, 3),
+        |  WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE)
+        |LIKE MyTable""".stripMargin)
+
+    val tableResult = tEnv.executeSql(
+      """
+        |SELECT MyTableWithRowtime1.ts, MyTableWithRowtime2.ts
+        |FROM MyTableWithRowtime1, MyTableWithRowtime2
+        |WHERE
+        |  MyTableWithRowtime1.first = MyTableWithRowtime2.first AND
+        |  MyTableWithRowtime1.ts = MyTableWithRowtime2.ts""".stripMargin)
+
+    val expected = for (i <- 1 to 8) yield
+      Row.ofKind(RowKind.INSERT, Instant.ofEpochMilli(i), 
Instant.ofEpochMilli(i))
+
+    val actual = CollectionUtil.iteratorToList(tableResult.collect())
+    assertThat(actual, containsInAnyOrder(expected: _*))
+  }
 }
 
 object TableITCase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index da20323..fe73d24 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -430,8 +430,9 @@ class TableSinkITCase extends StreamingTestBase {
       .select('num, 'w.rowtime as 'rowtime1, 'w.rowtime as 'rowtime2)
 
     thrown.expect(classOf[TableException])
-    thrown.expectMessage("Found more than one rowtime field: [rowtime1, 
rowtime2] " +
-      "in the query when insert into 'default_catalog.default_database.sink'")
+    thrown.expectMessage(
+      "The query contains more than one rowtime attribute column [rowtime1, 
rowtime2] for " +
+        "writing into table 'default_catalog.default_database.sink'.")
     table.executeInsert("sink")
   }
 

Reply via email to