This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new b94d7b252 [FLINK-39563][paimon] Upgrade paimon dependency to 1.4.1
(#4384)
b94d7b252 is described below
commit b94d7b252eb66908f33d345d2f31105ad3561e78
Author: Pei Yu <[email protected]>
AuthorDate: Wed Apr 29 14:25:29 2026 +0800
[FLINK-39563][paimon] Upgrade paimon dependency to 1.4.1 (#4384)
Signed-off-by: Pei Yu <[email protected]>
---
.../flink-cdc-pipeline-connector-paimon/pom.xml | 2 +-
.../paimon/sink/v2/PreCommitOperator.java | 3 +--
.../paimon/sink/v2/StoreSinkWriteImpl.java | 14 ++++---------
.../paimon/sink/v2/PaimonSinkITCase.java | 3 +--
.../TestMultiTableCommittableChannelComputer.java | 24 +++++++++++-----------
.../flink-cdc-pipeline-e2e-tests/pom.xml | 2 +-
6 files changed, 20 insertions(+), 28 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index 5e15835a8..5eb925da1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
<properties>
- <paimon.version>1.3.1</paimon.version>
+ <paimon.version>1.4.1</paimon.version>
<hadoop.version>2.8.5</hadoop.version>
<hive.version>2.3.9</hive.version>
<mockito.version>3.12.4</mockito.version>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
index 8758bcbcb..c3fefd3a2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
@@ -115,8 +115,7 @@ public class PreCommitOperator
multiTableCommittable.getDatabase(),
multiTableCommittable.getTable(),
checkpointId,
- multiTableCommittable.kind(),
- multiTableCommittable.wrappedCommittable()));
+ multiTableCommittable.commitMessage()));
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index bba9e90cb..c709253cc 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -130,13 +130,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite
{
if (memoryPoolFactory != null) {
return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
} else {
- return (TableWriteImpl<?>)
- tableWrite.withMemoryPool(
+ return tableWrite.withMemoryPoolFactory(
+ new MemoryPoolFactory(
memoryPool != null
? memoryPool
: new HeapMemorySegmentPool(
table.coreOptions().writeBufferSize(),
- table.coreOptions().pageSize()));
+ table.coreOptions().pageSize())));
}
}
@@ -159,11 +159,6 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
return write.writeAndReturn(internalRow, i);
}
- @Override
- public SinkRecord toLogRecord(SinkRecord record) {
- return write.toLogRecord(record);
- }
-
@Override
public void compact(BinaryRow partition, int bucket, boolean
fullCompaction) throws Exception {
write.compact(partition, bucket, fullCompaction);
@@ -191,8 +186,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
try {
for (CommitMessage committable :
write.prepareCommit(this.waitCompaction ||
waitCompaction, checkpointId)) {
- committables.add(
- new Committable(checkpointId,
Committable.Kind.FILE, committable));
+ committables.add(new Committable(checkpointId,
committable));
}
} catch (Exception e) {
throw new IOException(e);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 53764cf5a..381c87b24 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -1044,8 +1044,7 @@ public class PaimonSinkITCase {
committable.getDatabase(),
committable.getTable(),
checkpointId++,
- committable.kind(),
- committable.wrappedCommittable());
+ committable.commitMessage());
}
private static class MockCommitRequestImpl<CommT> extends
CommitRequestImpl<CommT> {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
index e065d13a8..ca54cbc51 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
@@ -39,18 +39,18 @@ public class TestMultiTableCommittableChannelComputer {
computer.setup(4);
List<MultiTableCommittable> commits =
Arrays.asList(
- new MultiTableCommittable("database", "table1", 1L,
null, null),
- new MultiTableCommittable("database", "table2", 1L,
null, null),
- new MultiTableCommittable("database", "table1", 1L,
null, null),
- new MultiTableCommittable("database", "table5", 1L,
null, null),
- new MultiTableCommittable("database", "table3", 1L,
null, null),
- new MultiTableCommittable("database", "table8", 1L,
null, null),
- new MultiTableCommittable("database", "table5", 1L,
null, null),
- new MultiTableCommittable("database", "table1", 1L,
null, null),
- new MultiTableCommittable("database", "table9", 1L,
null, null),
- new MultiTableCommittable("database", "table5", 1L,
null, null),
- new MultiTableCommittable("database", "table3", 1L,
null, null),
- new MultiTableCommittable("database", "table8", 1L,
null, null));
+ new MultiTableCommittable("database", "table1", 1L,
null),
+ new MultiTableCommittable("database", "table2", 1L,
null),
+ new MultiTableCommittable("database", "table1", 1L,
null),
+ new MultiTableCommittable("database", "table5", 1L,
null),
+ new MultiTableCommittable("database", "table3", 1L,
null),
+ new MultiTableCommittable("database", "table8", 1L,
null),
+ new MultiTableCommittable("database", "table5", 1L,
null),
+ new MultiTableCommittable("database", "table1", 1L,
null),
+ new MultiTableCommittable("database", "table9", 1L,
null),
+ new MultiTableCommittable("database", "table5", 1L,
null),
+ new MultiTableCommittable("database", "table3", 1L,
null),
+ new MultiTableCommittable("database", "table8", 1L,
null));
Map<Integer, Set<String>> map = new HashMap<>();
commits.forEach(
(commit) -> {
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 88130ce23..24e6213ed 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
<flink-major-1.20>1.20</flink-major-1.20>
<mysql.driver.version>8.0.27</mysql.driver.version>
<starrocks.connector.version>1.2.14_flink-${flink-major-1.20}</starrocks.connector.version>
- <paimon.version>1.3.1</paimon.version>
+ <paimon.version>1.4.1</paimon.version>
<flink.release.download.skip>false</flink.release.download.skip>
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>