This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch 2.x-connect-tmp-2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit fcc0ba08c105d54d8f5c5a1dc507804a53515f88
Author: lvyanquan <[email protected]>
AuthorDate: Wed Mar 18 14:00:15 2026 +0800

    Address comments.
---
 .github/workflows/modules.py                       | 14 --------
 .../flink/translator/DataSinkTranslator.java       | 37 ----------------------
 .../src/test/resources/log4j2-test.properties      |  2 +-
 .../flink-cdc-pipeline-connector-paimon/pom.xml    |  2 +-
 .../cdc/connectors/postgres/PostgresTestBase.java  | 10 ++++++
 .../src/test/resources/ddl/replica_identity.sql    |  5 ++-
 flink-cdc-connect/pom.xml                          |  4 ++-
 .../flink-cdc-pipeline-e2e-tests/pom.xml           | 11 +------
 pom.xml                                            |  2 ++
 9 files changed, 20 insertions(+), 67 deletions(-)

diff --git a/.github/workflows/modules.py b/.github/workflows/modules.py
index 872e08526..86e94164f 100755
--- a/.github/workflows/modules.py
+++ b/.github/workflows/modules.py
@@ -43,19 +43,6 @@ MODULES_PIPELINE_CONNECTORS = [
     
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
 ]
 
-MODULES_PIPELINE_CONNECTORS_2_X = [
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks",
-    
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch"
-]
-
 MODULES_MYSQL_SOURCE = [
     "flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc",
     
"flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc"
@@ -165,7 +152,6 @@ ALL_MODULES = set(
     MODULES_CORE +
     MODULES_CORE_2_X +
     MODULES_PIPELINE_CONNECTORS +
-    MODULES_PIPELINE_CONNECTORS_2_X +
     MODULES_MYSQL_SOURCE +
     MODULES_MYSQL_PIPELINE +
     MODULES_POSTGRES_SOURCE +
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index d59ad1227..14eb95134 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
 import 
org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator;
 import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
 import 
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -48,14 +47,11 @@ import 
org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
 import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
 
-import java.lang.reflect.InvocationTargetException;
-
 /** Translator used to build {@link DataSink} for given {@link DataStream}. */
 @Internal
 public class DataSinkTranslator {
@@ -224,37 +220,4 @@ public class DataSinkTranslator {
         return sinkDef.getName()
                 .orElse(String.format("Flink CDC Event Sink: %s", 
sinkDef.getType()));
     }
-
-    private static <CommT> SimpleVersionedSerializer<CommT> 
getCommittableSerializer(Object sink) {
-        // FIX ME: TwoPhaseCommittingSink has been deprecated, and its 
signature has changed
-        // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer 
supported.
-        try {
-            return (SimpleVersionedSerializer<CommT>)
-                    
sink.getClass().getMethod("getCommittableSerializer").invoke(sink);
-        } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-            throw new RuntimeException("Failed to get CommittableSerializer", 
e);
-        }
-    }
-
-    private static <CommT>
-            OneInputStreamOperatorFactory<CommittableMessage<CommT>, 
CommittableMessage<CommT>>
-                    getCommitterOperatorFactory(
-                            Sink<Event> sink, boolean isBatchMode, boolean 
isCheckpointingEnabled) {
-        // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and 
its signature has
-        // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is 
no longer supported.
-        try {
-            return (OneInputStreamOperatorFactory<
-                            CommittableMessage<CommT>, 
CommittableMessage<CommT>>)
-                    Class.forName(
-                                    
"org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory")
-                            .getDeclaredConstructors()[0]
-                            .newInstance(sink, isBatchMode, 
isCheckpointingEnabled);
-
-        } catch (ClassNotFoundException
-                | InstantiationException
-                | IllegalAccessException
-                | InvocationTargetException e) {
-            throw new RuntimeException("Failed to create 
CommitterOperatorFactory", e);
-        }
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties
index 318095364..32df1c025 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/test/resources/log4j2-test.properties
@@ -15,7 +15,7 @@
 
 # Set root logger level to ERROR to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level=INFO
+rootLogger.level=ERROR
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index 274129b0c..5e15835a8 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -39,7 +39,7 @@ limitations under the License.
         <!-- paimon dependency -->
         <dependency>
             <groupId>org.apache.paimon</groupId>
-            <artifactId>paimon-flink-${flink.major.version}</artifactId>
+            <artifactId>paimon-flink-${paimon.flink.major.version}</artifactId>
             <version>${paimon.version}</version>
         </dependency>
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index 6d9205386..dafd9fcc4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -171,7 +171,17 @@ public abstract class PostgresTestBase extends 
AbstractTestBase {
     }
 
     protected void waitForSnapshotStarted(String sinkName) throws 
InterruptedException {
+        waitForSnapshotStarted(sinkName, 120000L); // default 2 minutes timeout
+    }
+
+    protected void waitForSnapshotStarted(String sinkName, long timeoutMillis)
+            throws InterruptedException {
+        long start = System.currentTimeMillis();
         while (sinkSize(sinkName) == 0) {
+            if (System.currentTimeMillis() - start > timeoutMillis) {
+                throw new AssertionError(
+                        "Timeout waiting for snapshot to start. Sink: " + 
sinkName);
+            }
             sleep(300);
         }
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql
index 0df973a76..b6cd6be41 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/replica_identity.sql
@@ -27,9 +27,8 @@ CREATE TABLE products (
 );
 ALTER SEQUENCE products_id_seq RESTART WITH 101;
 
--- Enable REPLICA IDENTITY FULL to capture before values for UPDATE/DELETE 
operations
--- This is required for upsert changelog mode to work correctly
-ALTER TABLE products REPLICA IDENTITY FULL;
+-- USE default REPLICA IDENTITY
+-- ALTER TABLE products REPLICA IDENTITY FULL;
 
 INSERT INTO products
 VALUES (default,'scooter','Small 2-wheel scooter',3.14),
diff --git a/flink-cdc-connect/pom.xml b/flink-cdc-connect/pom.xml
index 44a51d1c8..4cb3bb518 100644
--- a/flink-cdc-connect/pom.xml
+++ b/flink-cdc-connect/pom.xml
@@ -36,6 +36,9 @@ limitations under the License.
     <profiles>
         <profile>
             <id>flink1</id>
+            <properties>
+                <paimon.flink.major.version>1.20</paimon.flink.major.version>
+            </properties>
             <activation>
                 <activeByDefault>true</activeByDefault>
             </activation>
@@ -53,7 +56,6 @@ limitations under the License.
             <id>flink2</id>
             <properties>
                 <flink.version>${flink.2.x.version}</flink.version>
-                <flink.major.version>2.1</flink.major.version>
             </properties>
             <dependencies>
                 <dependency>
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index d8d4ebcac..176fd8d4b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -773,7 +773,7 @@ limitations under the License.
                         </artifactItem>
                         <artifactItem>
                             <groupId>org.apache.paimon</groupId>
-                            
<artifactId>paimon-flink-${flink.major.version}</artifactId>
+                            <artifactId>paimon-flink-2.1</artifactId>
                             <version>${paimon.version}</version>
                             
<destFileName>paimon-sql-connector-2.2.0.jar</destFileName>
                             <type>jar</type>
@@ -823,13 +823,4 @@ limitations under the License.
             </plugin>
         </plugins>
     </build>
-
-    <profiles>
-        <profile>
-            <id>flink2</id>
-            <properties>
-                <flink.major.version>2.1</flink.major.version>
-            </properties>
-        </profile>
-    </profiles>
 </project>
diff --git a/pom.xml b/pom.xml
index d58218c94..4b8b737ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@ limitations under the License.
         <maven.compiler.source>${source.java.version}</maven.compiler.source>
         <maven.compiler.target>${target.java.version}</maven.compiler.target>
         
<flink.kafka.connector.version>3.3.0-1.20</flink.kafka.connector.version>
+        <paimon.flink.major.version>1.20</paimon.flink.major.version>
     </properties>
 
     <dependencyManagement>
@@ -853,6 +854,7 @@ limitations under the License.
             <id>flink2</id>
             <properties>
                 
<flink.kafka.connector.version>4.0.1-2.0</flink.kafka.connector.version>
+                <paimon.flink.major.version>2.1</paimon.flink.major.version>
             </properties>
             <build>
                 <pluginManagement>

Reply via email to