Hongshun Wang created FLINK-35149:
-------------------------------------
Summary: Fix DataSinkTranslator#sinkTo ignoring pre-write topology
if not TwoPhaseCommittingSink
Key: FLINK-35149
URL: https://issues.apache.org/jira/browse/FLINK-35149
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Hongshun Wang
Fix For: 3.1.0
Current , when sink is not instanceof TwoPhaseCommittingSink, use
input.transform rather than stream. It means that pre-write topology will be
ignored.
{code:java}
private void sinkTo(
DataStream<Event> input,
Sink<Event> sink,
String sinkName,
OperatorID schemaOperatorID) {
DataStream<Event> stream = input;
// Pre write topology
if (sink instanceof WithPreWriteTopology) {
stream = ((WithPreWriteTopology<Event>)
sink).addPreWriteTopology(stream);
}
if (sink instanceof TwoPhaseCommittingSink) {
addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
} else {
input.transform(
SINK_WRITER_PREFIX + sinkName,
CommittableMessageTypeInfo.noOutput(),
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)