Hongshun Wang created FLINK-35149: ------------------------------------- Summary: Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink Key: FLINK-35149 URL: https://issues.apache.org/jira/browse/FLINK-35149 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Hongshun Wang Fix For: 3.1.0
Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored. {code:java} private void sinkTo( DataStream<Event> input, Sink<Event> sink, String sinkName, OperatorID schemaOperatorID) { DataStream<Event> stream = input; // Pre write topology if (sink instanceof WithPreWriteTopology) { stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream); } if (sink instanceof TwoPhaseCommittingSink) { addCommittingTopology(sink, stream, sinkName, schemaOperatorID); } else { input.transform( SINK_WRITER_PREFIX + sinkName, CommittableMessageTypeInfo.noOutput(), new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID)); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)