twalthr commented on a change in pull request #17759:
URL: https://github.com/apache/flink/pull/17759#discussion_r748294620
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
sinkParallelism);
}
+ private Transformation<RowData> createTimestampTransformation(
Review comment:
call it `apply` to be in sync with the other method that add operators
in this class
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
sinkParallelism);
}
+ private Transformation<RowData> createTimestampTransformation(
+ Transformation<RowData> inputTransform, int rowtimeFieldIndex, int
sinkParallelism) {
+ return new OneInputTransformation<>(
Review comment:
skip the transformation if the `rowtimeFieldIndex` is -1.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
sinkParallelism);
}
+ private Transformation<RowData> createTimestampTransformation(
+ Transformation<RowData> inputTransform, int rowtimeFieldIndex, int
sinkParallelism) {
+ return new OneInputTransformation<>(
+ inputTransform,
+ "TimestampMaterializer",
Review comment:
How about we call this `StreamRecordTimestampInserter` and the operator?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -260,7 +280,7 @@ public void testFromAndToDataStreamWithRaw() throws
Exception {
Row.of(DayOfWeek.MONDAY, ZoneOffset.UTC),
Row.of(DayOfWeek.FRIDAY, ZoneOffset.ofHours(5)));
testResult(
- tableEnv.toDataStream(table,
DataTypes.of(dataStream.getType())),
+ tableEnv.toDataStream(table, of(dataStream.getType())),
Review comment:
nit: I find static imports of `of` very difficult to read. Also in a
class that uses `Types.` from DataStream API a static import could cause
conflicts (at least in readbility).
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -301,6 +302,9 @@ private int deriveSinkParallelism(
int rowtimeFieldIndex,
int sinkParallelism) {
if (runtimeProvider instanceof DataStreamSinkProvider) {
+ inputTransform =
Review comment:
nit: I remember a community discussion to avoid reassigning values to
arguments of method. Introduce a local variable `inputWithTimestampTransform`
to make it explicit.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -713,6 +731,105 @@ public void testMultiChangelogStreamUpsert() throws
Exception {
resultStream, 0, Row.of(2, null, null, null), Row.of(1, 11.0,
"1", "A"));
}
+ @Test
+ public void testTimestampAPIConverterOperatorSinkRuntimeProvider() {
+ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
+ final SharedReference<List<Long>> timestamps = sharedObjects.add(new
ArrayList<>());
+ final Schema schema = schemaForTimeOperatorTesting();
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "foo",
LocalDateTime.parse("2020-11-10T12:34:56.123")),
+ Row.of(2, "foo",
LocalDateTime.parse("2020-11-10T11:34:56.789")),
+ Row.of(3, "foo",
LocalDateTime.parse("2020-11-11T10:11:22.777")),
+ Row.of(4, "foo",
LocalDateTime.parse("2020-11-11T10:11:23.888")));
+
+ final TableDescriptor sourceDescriptor =
+ TableFactoryHarness.newBuilder()
+ .schema(schema)
+ .source(new TimestampTestSource(rows, ROW(INT(),
STRING(), TIMESTAMP(3))))
+ .sink(
+ new TableFactoryHarness.SinkBase() {
+ @Override
+ public SinkRuntimeProvider
getSinkRuntimeProvider(
+ Context context) {
+ return SinkProvider.of(
+ TestSink.newBuilder()
+ .setWriter(new
TestWriter(timestamps))
+
.setCommittableSerializer(
+
TestSink.StringCommittableSerializer
+
.INSTANCE)
+ .build());
+ }
+ })
+ .build();
+ tableEnv.createTable("T1", sourceDescriptor);
+ tableEnv.executeSql("INSERT INTO T1 SELECT * FROM
T1").collect().forEachRemaining(l -> {});
+ assertTimestampResults(timestamps, rows);
+ }
+
+ @Test
+ public void testTimestampAPIConverterOperatorDataStreamSinkProvider() {
+ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);
+ final SharedReference<List<Long>> timestamps = sharedObjects.add(new
ArrayList<>());
+ final Schema schema = schemaForTimeOperatorTesting();
+ final List<Row> rows =
+ Arrays.asList(
+ Row.of(1, "foo",
LocalDateTime.parse("2020-11-10T11:34:56.123")),
Review comment:
use `TIMESTAMP_LTZ` and `Instant`? It makes the asserting also easier
later.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -351,6 +356,16 @@ private int deriveSinkParallelism(
sinkParallelism);
}
+ private Transformation<RowData> createTimestampTransformation(
+ Transformation<RowData> inputTransform, int rowtimeFieldIndex, int
sinkParallelism) {
+ return new OneInputTransformation<>(
+ inputTransform,
+ "TimestampMaterializer",
+ new TimestampAPIConverterOperator(rowtimeFieldIndex),
Review comment:
Deduplicate code by replacing usages of `RowtimeProcessFunction`?
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
##########
@@ -713,6 +731,105 @@ public void testMultiChangelogStreamUpsert() throws
Exception {
resultStream, 0, Row.of(2, null, null, null), Row.of(1, 11.0,
"1", "A"));
}
+ @Test
+ public void testTimestampAPIConverterOperatorSinkRuntimeProvider() {
Review comment:
Actually, `DataStreamJavaITCase` was meant to bridging tests of
`to/fromDataStream`. It might be better to put the test somewhere else. Maybe
you find a better one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]