Hongshun Wang created FLINK-35149:
-------------------------------------

             Summary: Fix DataSinkTranslator#sinkTo ignoring pre-write topology 
if not TwoPhaseCommittingSink
                 Key: FLINK-35149
                 URL: https://issues.apache.org/jira/browse/FLINK-35149
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Hongshun Wang
             Fix For: 3.1.0


Current , when sink is not instanceof TwoPhaseCommittingSink, use 
input.transform rather than stream. It means that pre-write topology will be 
ignored.
{code:java}
private void sinkTo(
        DataStream<Event> input,
        Sink<Event> sink,
        String sinkName,
        OperatorID schemaOperatorID) {
    DataStream<Event> stream = input;
    // Pre write topology
    if (sink instanceof WithPreWriteTopology) {
        stream = ((WithPreWriteTopology<Event>) 
sink).addPreWriteTopology(stream);
    }

    if (sink instanceof TwoPhaseCommittingSink) {
        addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
    } else {
        input.transform(
                SINK_WRITER_PREFIX + sinkName,
                CommittableMessageTypeInfo.noOutput(),
                new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
    }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to