This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 548b96e9cb226aaf8d919d900c9326520a5b6dc8 Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Wed Nov 10 18:02:45 2021 +0100 [FLINK-24608][table-planner][table-runtime] Insert rowtime into StreamRecord for SinkProviders Previously, sinks built using the unified sink framework were missing the `timestamp` from the `StreamRecord` when used with `TableAPI`. Introduce a new Operator which sets the timestamp to each `StreamRecord` from the corresponding field of each row. This closes #17759. --- .../plan/nodes/exec/common/CommonExecSink.java | 22 +- .../plan/nodes/exec/stream/StreamExecMatch.java | 13 +- .../nodes/exec/common/CommonExecSinkITCase.java | 278 +++++++++++++++++++++ .../operators/match/RowtimeProcessFunction.java | 60 ----- .../sink/StreamRecordTimestampInserter.java | 54 ++++ 5 files changed, 357 insertions(+), 70 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 8f4b12a8..05145d0f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -61,6 +61,7 @@ import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer; import org.apache.flink.table.runtime.operators.sink.SinkOperator; import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer; +import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.StateConfigUtil; @@ -301,7 +302,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> int rowtimeFieldIndex, int sinkParallelism) { if (runtimeProvider instanceof DataStreamSinkProvider) { - final DataStream<RowData> dataStream = new DataStream<>(env, inputTransform); + Transformation<RowData> sinkTransformation = + applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism); + final DataStream<RowData> dataStream = new DataStream<>(env, sinkTransformation); final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider; return provider.consumeDataStream(dataStream).getTransformation(); } else if (runtimeProvider instanceof TransformationSinkProvider) { @@ -322,7 +325,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism); } else if (runtimeProvider instanceof SinkProvider) { return new SinkTransformation<>( - inputTransform, + applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism), ((SinkProvider) runtimeProvider).createSink(), getDescription(), sinkParallelism); @@ -351,6 +354,21 @@ public abstract class CommonExecSink extends ExecNodeBase<Object> sinkParallelism); } + private Transformation<RowData> applyRowtimeTransformation( + Transformation<RowData> inputTransform, int rowtimeFieldIndex, int sinkParallelism) { + // Don't apply the transformation/operator if there is no rowtimeFieldIndex + if (rowtimeFieldIndex == -1) { + return inputTransform; + } + return new OneInputTransformation<>( + inputTransform, + String.format( + "StreamRecordTimestampInserter(rowtime field: %s)", rowtimeFieldIndex), + new StreamRecordTimestampInserter(rowtimeFieldIndex), + inputTransform.getOutputType(), + sinkParallelism); + } + private InternalTypeInfo<RowData> getInputTypeInfo() { return InternalTypeInfo.of(getInputEdges().get(0).getOutputType()); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java index 74e0e14..e246c7a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java @@ -30,7 +30,6 @@ import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.TableConfig; @@ -55,7 +54,7 @@ import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner; import org.apache.flink.table.runtime.operators.match.RowDataEventComparator; -import org.apache.flink.table.runtime.operators.match.RowtimeProcessFunction; +import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.TypeCheckUtils; import org.apache.flink.table.types.logical.LogicalType; @@ -259,12 +258,10 @@ public class StreamExecMatch extends ExecNodeBase<RowData> Transformation<RowData> transform = new OneInputTransformation<>( inputTransform, - String.format("rowtime field: (%s)", timeOrderFieldIdx), - new ProcessOperator<>( - new RowtimeProcessFunction( - timeOrderFieldIdx, - inputTransform.getOutputType(), - precision)), + String.format( + "StreamRecordTimestampInserter(rowtime field: %s)", + timeOrderFieldIdx), + new StreamRecordTimestampInserter(timeOrderFieldIdx, precision), inputTransform.getOutputType(), inputTransform.getParallelism()); if (inputsContainSingleton()) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java new file mode 100644 index 0000000..d2cabbc --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.runtime.operators.sink.TestSink; +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableDescriptor; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.factories.TableFactoryHarness; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.types.Row; + +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; + +/** Test for {@link CommonExecSink}. */ +public class CommonExecSinkITCase extends AbstractTestBase { + + private StreamExecutionEnvironment env; + + @Before + public void before() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + } + + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + @Test + public void testStreamRecordTimestampInserterSinkRuntimeProvider() + throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>()); + final List<Row> rows = + Arrays.asList( + Row.of(1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")), + Row.of(2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")), + Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")), + Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z"))); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaStreamRecordTimestampInserter(true)) + .source(new TimestampTestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DynamicTableSink.SinkRuntimeProvider + getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return SinkProvider.of( + TestSink.newBuilder() + .setWriter(new TestWriter(timestamps)) + .setCommittableSerializer( + TestSink.StringCommittableSerializer + .INSTANCE) + .build()); + } + }) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + String sqlStmt = "INSERT INTO T1 SELECT * FROM T1"; + assertPlan(tableEnv, sqlStmt, true); + tableEnv.executeSql(sqlStmt).await(); + assertTimestampResults(timestamps, rows); + } + + @Test + public void testStreamRecordTimestampInserterDataStreamSinkProvider() + throws ExecutionException, InterruptedException { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>()); + final List<Row> rows = + Arrays.asList( + Row.of(1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")), + Row.of(2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")), + Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")), + Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z"))); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaStreamRecordTimestampInserter(true)) + .source(new TimestampTestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DataStreamSinkProvider getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return dataStream -> + dataStream.addSink( + new SinkFunction<RowData>() { + @Override + public void invoke( + RowData value, + Context context) { + addTimestamp( + timestamps, + context.timestamp()); + } + }); + } + }) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + String sqlStmt = "INSERT INTO T1 SELECT * FROM T1"; + assertPlan(tableEnv, sqlStmt, true); + tableEnv.executeSql(sqlStmt).await(); + Collections.sort(timestamps.get()); + assertTimestampResults(timestamps, rows); + } + + @Test + public void testStreamRecordTimestampInserterNotApplied() { + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>()); + final List<Row> rows = + Arrays.asList( + Row.of(1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")), + Row.of(2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")), + Row.of(3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")), + Row.of(4, "foo", Instant.parse("2020-11-11T10:11:23.888Z"))); + + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(schemaStreamRecordTimestampInserter(false)) + .source(new TimestampTestSource(rows)) + .sink( + new TableFactoryHarness.SinkBase() { + @Override + public DynamicTableSink.SinkRuntimeProvider + getSinkRuntimeProvider( + DynamicTableSink.Context context) { + return SinkProvider.of( + TestSink.newBuilder() + .setWriter(new TestWriter(timestamps)) + .setCommittableSerializer( + TestSink.StringCommittableSerializer + .INSTANCE) + .build()); + } + }) + .build(); + tableEnv.createTable("T1", sourceDescriptor); + assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false); + } + + private static void addTimestamp(SharedReference<List<Long>> timestamps, Long timestamp) { + timestamps.applySync(l -> l.add(timestamp)); + } + + private static void assertPlan( + StreamTableEnvironment tableEnv, + String sql, + boolean containsStreamRecordTimestampInserter) { + Matcher<String> matcher = containsString("StreamRecordTimestampInserter(rowtime field: 2"); + if (!containsStreamRecordTimestampInserter) { + matcher = not(matcher); + } + assertThat(tableEnv.explainSql(sql, ExplainDetail.JSON_EXECUTION_PLAN), matcher); + } + + private static Schema schemaStreamRecordTimestampInserter(boolean withWatermark) { + Schema.Builder builder = + Schema.newBuilder() + .column("a", "INT") + .column("b", "STRING") + .column("ts", "TIMESTAMP_LTZ(3)"); + if (withWatermark) { + builder.watermark("ts", "ts"); + } + return builder.build(); + } + + private static void assertTimestampResults( + SharedReference<List<Long>> timestamps, List<Row> rows) { + assertEquals(rows.size(), timestamps.get().size()); + for (int i = 0; i < rows.size(); i++) { + assertEquals(rows.get(i).getField(2), Instant.ofEpochMilli(timestamps.get().get(i))); + } + } + + private static class TimestampTestSource extends TableFactoryHarness.ScanSourceBase { + + private final List<Row> rows; + + private TimestampTestSource(List<Row> rows) { + super(false); + this.rows = rows; + } + + @Override + public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider( + ScanTableSource.ScanContext context) { + final DynamicTableSource.DataStructureConverter converter = + context.createDataStructureConverter( + getFactoryContext().getPhysicalRowDataType()); + + return SourceFunctionProvider.of(new TestSource(rows, converter), true); + } + } + + private static class TestSource implements SourceFunction<RowData> { + + private final List<Row> rows; + private final DynamicTableSource.DataStructureConverter converter; + + public TestSource(List<Row> rows, DynamicTableSource.DataStructureConverter converter) { + this.rows = rows; + this.converter = converter; + } + + @Override + public void run(SourceContext<RowData> ctx) throws Exception { + rows.stream().map(row -> (RowData) converter.toInternal(row)).forEach(ctx::collect); + } + + @Override + public void cancel() {} + } + + private static class TestWriter extends TestSink.DefaultSinkWriter<RowData> { + + private final SharedReference<List<Long>> timestamps; + + private TestWriter(SharedReference<List<Long>> timestamps) { + this.timestamps = timestamps; + } + + @Override + public void write(RowData element, Context context) { + addTimestamp(timestamps, context.timestamp()); + super.write(element, context); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java deleted file mode 100644 index e653798..0000000 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/match/RowtimeProcessFunction.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.match; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.table.data.RowData; -import org.apache.flink.util.Collector; - -/** - * ProcessFunction to copy a timestamp from a {@link RowData} field into the {@link - * org.apache.flink.streaming.runtime.streamrecord.StreamRecord}. - */ -public class RowtimeProcessFunction extends ProcessFunction<RowData, RowData> - implements ResultTypeQueryable<RowData> { - - private static final long serialVersionUID = 1L; - - private final int rowtimeIdx; - private final int precision; - private transient TypeInformation<RowData> returnType; - - public RowtimeProcessFunction( - int rowtimeIdx, TypeInformation<RowData> returnType, int precision) { - this.rowtimeIdx = rowtimeIdx; - this.returnType = returnType; - this.precision = precision; - } - - @Override - public void processElement(RowData value, Context ctx, Collector<RowData> out) - throws Exception { - long timestamp = value.getTimestamp(rowtimeIdx, precision).getMillisecond(); - ((TimestampedCollector<RowData>) out).setAbsoluteTimestamp(timestamp); - out.collect(value); - } - - @Override - public TypeInformation<RowData> getProducedType() { - return returnType; - } -} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java new file mode 100644 index 0000000..c9e18eb --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +/** + * Operator which sets the timestamp on the StreamRecord from the corresponding column of each row. + */ +@Internal +public class StreamRecordTimestampInserter extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private final int rowtimeIndex; + private final int precision; + + public StreamRecordTimestampInserter(int rowtimeIndex, int precision) { + this.rowtimeIndex = rowtimeIndex; + this.precision = precision; + } + + public StreamRecordTimestampInserter(int rowtimeIndex) { + this(rowtimeIndex, 3); + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + final RowData rowData = element.getValue(); + // timestamp might be TIMESTAMP or TIMESTAMP_LTZ + final long rowtime = rowData.getTimestamp(rowtimeIndex, precision).getMillisecond(); + element.setTimestamp(rowtime); + output.collect(element); + } +}