This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb4a4b61 [FLINK-39257] Remove compatibility code for older versions 
of Flink (#4317)
bcb4a4b61 is described below

commit bcb4a4b61404c6ecfb4e48e343b4968c44e7fc52
Author: Pei Yu <[email protected]>
AuthorDate: Tue Mar 17 09:08:20 2026 +0800

    [FLINK-39257] Remove compatibility code for older versions of Flink (#4317)
    
    Signed-off-by: Pei Yu <[email protected]>
---
 .../flink/translator/DataSinkTranslator.java       | 46 +++-------------------
 1 file changed, 6 insertions(+), 40 deletions(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index aa530300b..117b08aed 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
 import 
org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator;
 import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
 import 
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -46,12 +45,10 @@ import 
org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
 import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
-
-import java.lang.reflect.InvocationTargetException;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
 
 /** Translator used to build {@link DataSink} for given {@link DataStream}. */
 @Internal
@@ -178,8 +175,10 @@ public class DataSinkTranslator {
             boolean isBatchMode,
             OperatorID schemaOperatorID,
             OperatorUidGenerator operatorUidGenerator) {
+        TwoPhaseCommittingSink<Event, CommT> committingSink =
+                (TwoPhaseCommittingSink<Event, CommT>) sink;
         TypeInformation<CommittableMessage<CommT>> typeInformation =
-                CommittableMessageTypeInfo.of(() -> 
getCommittableSerializer(sink));
+                
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
         DataStream<CommittableMessage<CommT>> written =
                 inputStream
                         .transform(
@@ -202,8 +201,8 @@ public class DataSinkTranslator {
                         .transform(
                                 SINK_COMMITTER_PREFIX + sinkName,
                                 typeInformation,
-                                getCommitterOperatorFactory(
-                                        sink, isBatchMode, 
isCheckpointingEnabled))
+                                new CommitterOperatorFactory<>(
+                                        committingSink, isBatchMode, 
isCheckpointingEnabled))
                         
.uid(operatorUidGenerator.generateUid("sink-committer"));
 
         if (sink instanceof WithPostCommitTopology) {
@@ -215,37 +214,4 @@ public class DataSinkTranslator {
         return sinkDef.getName()
                 .orElse(String.format("Flink CDC Event Sink: %s", 
sinkDef.getType()));
     }
-
-    private static <CommT> SimpleVersionedSerializer<CommT> 
getCommittableSerializer(Object sink) {
-        // FIX ME: TwoPhaseCommittingSink has been deprecated, and its 
signature has changed
-        // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer 
supported.
-        try {
-            return (SimpleVersionedSerializer<CommT>)
-                    
sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink);
-        } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-            throw new RuntimeException("Failed to get CommittableSerializer", 
e);
-        }
-    }
-
-    private static <CommT>
-            OneInputStreamOperatorFactory<CommittableMessage<CommT>, 
CommittableMessage<CommT>>
-                    getCommitterOperatorFactory(
-                            Sink<Event> sink, boolean isBatchMode, boolean 
isCheckpointingEnabled) {
-        // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and 
its signature has
-        // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is 
no longer supported.
-        try {
-            return (OneInputStreamOperatorFactory<
-                            CommittableMessage<CommT>, 
CommittableMessage<CommT>>)
-                    Class.forName(
-                                    
"org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory")
-                            .getDeclaredConstructors()[0]
-                            .newInstance(sink, isBatchMode, 
isCheckpointingEnabled);
-
-        } catch (ClassNotFoundException
-                | InstantiationException
-                | IllegalAccessException
-                | InvocationTargetException e) {
-            throw new RuntimeException("Failed to create 
CommitterOperatorFactory", e);
-        }
-    }
 }

Reply via email to