twalthr commented on a change in pull request #17759:
URL: https://github.com/apache/flink/pull/17759#discussion_r748294620



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
                 sinkParallelism);
     }
 
+    private Transformation<RowData> createTimestampTransformation(

Review comment:
       call it `apply` to be in sync with the other method that add operators 
in this class

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
                 sinkParallelism);
     }
 
+    private Transformation<RowData> createTimestampTransformation(
+            Transformation<RowData> inputTransform, int rowtimeFieldIndex, int 
sinkParallelism) {
+        return new OneInputTransformation<>(

Review comment:
       skip the transformation if the `rowtimeFieldIndex` is -1.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
                 sinkParallelism);
     }
 
+    private Transformation<RowData> createTimestampTransformation(
+            Transformation<RowData> inputTransform, int rowtimeFieldIndex, int 
sinkParallelism) {
+        return new OneInputTransformation<>(
+                inputTransform,
+                "TimestampMaterializer",

Review comment:
       How about we call this `StreamRecordTimestampInserter` and the operator?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -260,7 +280,7 @@ public void testFromAndToDataStreamWithRaw() throws 
Exception {
                 Row.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
                 Row.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
         testResult(
-                tableEnv.toDataStream(table, 
DataTypes.of(dataStream.getType())),
+                tableEnv.toDataStream(table, of(dataStream.getType())),

Review comment:
       nit: I find static imports of `of` very difficult to read.  Also in a 
class that uses `Types.` from DataStream API a static import could cause 
conflicts (at least in readbility).

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -301,6 +302,9 @@ private int deriveSinkParallelism(
             int rowtimeFieldIndex,
             int sinkParallelism) {
         if (runtimeProvider instanceof DataStreamSinkProvider) {
+            inputTransform =

Review comment:
       nit: I remember a community discussion to avoid reassigning values to 
arguments of method. Introduce a local variable `inputWithTimestampTransform` 
to make it explicit.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -713,6 +731,105 @@ public void testMultiChangelogStreamUpsert() throws 
Exception {
                 resultStream, 0, Row.of(2, null, null, null), Row.of(1, 11.0, 
"1", "A"));
     }
 
+    @Test
+    public void testTimestampAPIConverterOperatorSinkRuntimeProvider() {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final SharedReference<List<Long>> timestamps = sharedObjects.add(new 
ArrayList<>());
+        final Schema schema = schemaForTimeOperatorTesting();
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "foo", 
LocalDateTime.parse("2020-11-10T12:34:56.123")),
+                        Row.of(2, "foo", 
LocalDateTime.parse("2020-11-10T11:34:56.789")),
+                        Row.of(3, "foo", 
LocalDateTime.parse("2020-11-11T10:11:22.777")),
+                        Row.of(4, "foo", 
LocalDateTime.parse("2020-11-11T10:11:23.888")));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(schema)
+                        .source(new TimestampTestSource(rows, ROW(INT(), 
STRING(), TIMESTAMP(3))))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public SinkRuntimeProvider 
getSinkRuntimeProvider(
+                                            Context context) {
+                                        return SinkProvider.of(
+                                                TestSink.newBuilder()
+                                                        .setWriter(new 
TestWriter(timestamps))
+                                                        
.setCommittableSerializer(
+                                                                
TestSink.StringCommittableSerializer
+                                                                        
.INSTANCE)
+                                                        .build());
+                                    }
+                                })
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+        tableEnv.executeSql("INSERT INTO T1 SELECT * FROM 
T1").collect().forEachRemaining(l -> {});
+        assertTimestampResults(timestamps, rows);
+    }
+
+    @Test
+    public void testTimestampAPIConverterOperatorDataStreamSinkProvider() {
+        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
+        final SharedReference<List<Long>> timestamps = sharedObjects.add(new 
ArrayList<>());
+        final Schema schema = schemaForTimeOperatorTesting();
+        final List<Row> rows =
+                Arrays.asList(
+                        Row.of(1, "foo", 
LocalDateTime.parse("2020-11-10T11:34:56.123")),

Review comment:
       use `TIMESTAMP_LTZ` and `Instant`? It makes the asserting also easier 
later.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
                 sinkParallelism);
     }
 
+    private Transformation<RowData> createTimestampTransformation(
+            Transformation<RowData> inputTransform, int rowtimeFieldIndex, int 
sinkParallelism) {
+        return new OneInputTransformation<>(
+                inputTransform,
+                "TimestampMaterializer",
+                new TimestampAPIConverterOperator(rowtimeFieldIndex),

Review comment:
       Deduplicate code by replacing usages of `RowtimeProcessFunction`?

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -713,6 +731,105 @@ public void testMultiChangelogStreamUpsert() throws 
Exception {
                 resultStream, 0, Row.of(2, null, null, null), Row.of(1, 11.0, 
"1", "A"));
     }
 
+    @Test
+    public void testTimestampAPIConverterOperatorSinkRuntimeProvider() {

Review comment:
       Actually, `DataStreamJavaITCase` was meant to bridging tests of 
`to/fromDataStream`. It might be better to put the test somewhere else. Maybe 
you find a better one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to