This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b94d7b252 [FLINK-39563][paimon] Upgrade paimon dependency to 1.4.1 
(#4384)
b94d7b252 is described below

commit b94d7b252eb66908f33d345d2f31105ad3561e78
Author: Pei Yu <[email protected]>
AuthorDate: Wed Apr 29 14:25:29 2026 +0800

    [FLINK-39563][paimon] Upgrade paimon dependency to 1.4.1 (#4384)
    
    Signed-off-by: Pei Yu <[email protected]>
---
 .../flink-cdc-pipeline-connector-paimon/pom.xml    |  2 +-
 .../paimon/sink/v2/PreCommitOperator.java          |  3 +--
 .../paimon/sink/v2/StoreSinkWriteImpl.java         | 14 ++++---------
 .../paimon/sink/v2/PaimonSinkITCase.java           |  3 +--
 .../TestMultiTableCommittableChannelComputer.java  | 24 +++++++++++-----------
 .../flink-cdc-pipeline-e2e-tests/pom.xml           |  2 +-
 6 files changed, 20 insertions(+), 28 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index 5e15835a8..5eb925da1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
     <artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
 
     <properties>
-        <paimon.version>1.3.1</paimon.version>
+        <paimon.version>1.4.1</paimon.version>
         <hadoop.version>2.8.5</hadoop.version>
         <hive.version>2.3.9</hive.version>
         <mockito.version>3.12.4</mockito.version>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
index 8758bcbcb..c3fefd3a2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
@@ -115,8 +115,7 @@ public class PreCommitOperator
                             multiTableCommittable.getDatabase(),
                             multiTableCommittable.getTable(),
                             checkpointId,
-                            multiTableCommittable.kind(),
-                            multiTableCommittable.wrappedCommittable()));
+                            multiTableCommittable.commitMessage()));
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index bba9e90cb..c709253cc 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -130,13 +130,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite 
{
         if (memoryPoolFactory != null) {
             return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
         } else {
-            return (TableWriteImpl<?>)
-                    tableWrite.withMemoryPool(
+            return tableWrite.withMemoryPoolFactory(
+                    new MemoryPoolFactory(
                             memoryPool != null
                                     ? memoryPool
                                     : new HeapMemorySegmentPool(
                                             
table.coreOptions().writeBufferSize(),
-                                            table.coreOptions().pageSize()));
+                                            table.coreOptions().pageSize())));
         }
     }
 
@@ -159,11 +159,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         return write.writeAndReturn(internalRow, i);
     }
 
-    @Override
-    public SinkRecord toLogRecord(SinkRecord record) {
-        return write.toLogRecord(record);
-    }
-
     @Override
     public void compact(BinaryRow partition, int bucket, boolean 
fullCompaction) throws Exception {
         write.compact(partition, bucket, fullCompaction);
@@ -191,8 +186,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             try {
                 for (CommitMessage committable :
                         write.prepareCommit(this.waitCompaction || 
waitCompaction, checkpointId)) {
-                    committables.add(
-                            new Committable(checkpointId, 
Committable.Kind.FILE, committable));
+                    committables.add(new Committable(checkpointId, 
committable));
                 }
             } catch (Exception e) {
                 throw new IOException(e);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 53764cf5a..381c87b24 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -1044,8 +1044,7 @@ public class PaimonSinkITCase {
                 committable.getDatabase(),
                 committable.getTable(),
                 checkpointId++,
-                committable.kind(),
-                committable.wrappedCommittable());
+                committable.commitMessage());
     }
 
     private static class MockCommitRequestImpl<CommT> extends 
CommitRequestImpl<CommT> {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
index e065d13a8..ca54cbc51 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
@@ -39,18 +39,18 @@ public class TestMultiTableCommittableChannelComputer {
         computer.setup(4);
         List<MultiTableCommittable> commits =
                 Arrays.asList(
-                        new MultiTableCommittable("database", "table1", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table2", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table1", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table5", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table3", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table8", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table5", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table1", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table9", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table5", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table3", 1L, 
null, null),
-                        new MultiTableCommittable("database", "table8", 1L, 
null, null));
+                        new MultiTableCommittable("database", "table1", 1L, 
null),
+                        new MultiTableCommittable("database", "table2", 1L, 
null),
+                        new MultiTableCommittable("database", "table1", 1L, 
null),
+                        new MultiTableCommittable("database", "table5", 1L, 
null),
+                        new MultiTableCommittable("database", "table3", 1L, 
null),
+                        new MultiTableCommittable("database", "table8", 1L, 
null),
+                        new MultiTableCommittable("database", "table5", 1L, 
null),
+                        new MultiTableCommittable("database", "table1", 1L, 
null),
+                        new MultiTableCommittable("database", "table9", 1L, 
null),
+                        new MultiTableCommittable("database", "table5", 1L, 
null),
+                        new MultiTableCommittable("database", "table3", 1L, 
null),
+                        new MultiTableCommittable("database", "table8", 1L, 
null));
         Map<Integer, Set<String>> map = new HashMap<>();
         commits.forEach(
                 (commit) -> {
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 88130ce23..24e6213ed 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
         <flink-major-1.20>1.20</flink-major-1.20>
         <mysql.driver.version>8.0.27</mysql.driver.version>
         
<starrocks.connector.version>1.2.14_flink-${flink-major-1.20}</starrocks.connector.version>
-        <paimon.version>1.3.1</paimon.version>
+        <paimon.version>1.4.1</paimon.version>
         <flink.release.download.skip>false</flink.release.download.skip>
         
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
         
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>

Reply via email to