This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 54b515b73 [paimon] Bump Paimon version from 1.0.1 to 1.2.0 (#1289)
54b515b73 is described below

commit 54b515b737e41da564dd5f3a094b4a6fcebf6392
Author: CaoZhen <[email protected]>
AuthorDate: Mon Jul 21 20:17:02 2025 +0800

    [paimon] Bump Paimon version from 1.0.1 to 1.2.0 (#1289)
---
 .../lakehouse/paimon/reader/ScanRecordWrapper.java |  6 ++++
 fluss-lake/fluss-lake-paimon/pom.xml               | 10 +++++-
 .../paimon/tiering/FlussRecordAsPaimonRow.java     |  7 ++++
 .../lake/paimon/tiering/PaimonLakeCommitter.java   | 42 ++++++++++++++--------
 .../src/main/resources/META-INF/NOTICE             |  2 +-
 .../lake/paimon/tiering/PaimonTieringTest.java     |  3 ++
 pom.xml                                            |  2 +-
 7 files changed, 54 insertions(+), 18 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
index b79c3a272..47e844503 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -157,6 +158,11 @@ public class ScanRecordWrapper implements InternalRow {
         return flussRow.getBytes(pos);
     }
 
+    @Override
+    public Variant getVariant(int pos) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public InternalArray getArray(int pos) {
         throw new UnsupportedOperationException();
diff --git a/fluss-lake/fluss-lake-paimon/pom.xml 
b/fluss-lake/fluss-lake-paimon/pom.xml
index 322caa994..e8df4406d 100644
--- a/fluss-lake/fluss-lake-paimon/pom.xml
+++ b/fluss-lake/fluss-lake-paimon/pom.xml
@@ -33,7 +33,7 @@
     <packaging>jar</packaging>
 
     <properties>
-        <paimon.version>1.0.1</paimon.version>
+        <paimon.version>1.2.0</paimon.version>
     </properties>
 
     <dependencies>
@@ -75,6 +75,14 @@
             <scope>test</scope>
         </dependency>
 
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>2.8.5</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs-client</artifactId>
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index 041380f07..97d543148 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -189,6 +190,12 @@ public class FlussRecordAsPaimonRow implements InternalRow 
{
         return internalRow.getBytes(pos);
     }
 
+    @Override
+    public Variant getVariant(int pos) {
+        throw new UnsupportedOperationException(
+                "getVariant is not support for Fluss record currently.");
+    }
+
     @Override
     public InternalArray getArray(int pos) {
         throw new UnsupportedOperationException(
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index ab2f691b3..c59271005 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -21,10 +21,12 @@ import 
com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
 import com.alibaba.fluss.lake.committer.LakeCommitter;
 import com.alibaba.fluss.metadata.TablePath;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreCommit;
@@ -56,6 +58,7 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     private final FileStoreTable fileStoreTable;
     private FileStoreCommit fileStoreCommit;
     private final TablePath tablePath;
+    private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
 
     public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, 
TablePath tablePath)
             throws IOException {
@@ -77,18 +80,16 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     @Override
     public long commit(PaimonCommittable committable) throws IOException {
         ManifestCommittable manifestCommittable = 
committable.manifestCommittable();
-        PaimonCommitCallback paimonCommitCallback = new PaimonCommitCallback();
         try {
             fileStoreCommit =
                     fileStoreTable
                             .store()
-                            .newCommit(
-                                    FLUSS_LAKE_TIERING_COMMIT_USER,
-                                    
Collections.singletonList(paimonCommitCallback));
-            fileStoreCommit.commit(manifestCommittable, 
Collections.emptyMap());
-            return checkNotNull(
-                    paimonCommitCallback.commitSnapshotId,
-                    "Paimon committed snapshot id must be non-null.");
+                            .newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, 
fileStoreTable);
+            fileStoreCommit.commit(manifestCommittable, false);
+            Long commitSnapshotId = currentCommitSnapshotId.get();
+            currentCommitSnapshotId.remove();
+
+            return checkNotNull(commitSnapshotId, "Paimon committed snapshot 
id must be non-null.");
         } catch (Throwable t) {
             if (fileStoreCommit != null) {
                 // if any error happen while commit, abort the commit to clean 
committable
@@ -100,7 +101,8 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
 
     @Override
     public void abort(PaimonCommittable committable) throws IOException {
-        fileStoreCommit = 
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
+        fileStoreCommit =
+                
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, 
fileStoreTable);
         
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
     }
 
@@ -189,19 +191,29 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
 
     private FileStoreTable getTable(TablePath tablePath) throws IOException {
         try {
-            return (FileStoreTable) 
paimonCatalog.getTable(toPaimon(tablePath));
+            FileStoreTable table =
+                    (FileStoreTable)
+                            paimonCatalog
+                                    .getTable(toPaimon(tablePath))
+                                    .copy(
+                                            Collections.singletonMap(
+                                                    
CoreOptions.COMMIT_CALLBACKS.key(),
+                                                    
PaimonLakeCommitter.PaimonCommitCallback.class
+                                                            .getName()));
+
+            return table;
         } catch (Exception e) {
             throw new IOException("Failed to get table " + tablePath + " in 
Paimon.", e);
         }
     }
 
-    private static class PaimonCommitCallback implements CommitCallback {
-
-        private Long commitSnapshotId = null;
+    /** A {@link CommitCallback} to save paimon commit snapshot info. */
+    public static class PaimonCommitCallback implements CommitCallback {
 
         @Override
-        public void call(List<ManifestEntry> list, Snapshot snapshot) {
-            this.commitSnapshotId = snapshot.id();
+        public void call(
+                List<ManifestEntry> list, List<IndexManifestEntry> indexFiles, 
Snapshot snapshot) {
+            currentCommitSnapshotId.set(snapshot.id());
         }
 
         @Override
diff --git a/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE 
b/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
index 82092698a..c2d4c8a6d 100644
--- a/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.paimon:paimon-bundle:1.0.1
+- org.apache.paimon:paimon-bundle:1.2.0
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 86aff158e..ae02112fb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -731,6 +731,9 @@ class PaimonTieringTest {
         paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
         paimonSchemaBuilder.column(
                 TIMESTAMP_COLUMN_NAME, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+        paimonSchemaBuilder.option(
+                CoreOptions.COMMIT_CALLBACKS.key(),
+                PaimonLakeCommitter.PaimonCommitCallback.class.getName());
         paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
         paimonCatalog.createTable(toPaimon(tablePath), 
paimonSchemaBuilder.build(), true);
     }
diff --git a/pom.xml b/pom.xml
index 8527f5996..87af19cfc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
         <curator.version>5.4.0</curator.version>
         <netty.version>4.1.104</netty.version>
         <arrow.version>15.0.0</arrow.version>
-        <paimon.version>1.0.1</paimon.version>
+        <paimon.version>1.2.0</paimon.version>
 
         <fluss.hadoop.version>2.10.2</fluss.hadoop.version>
         <frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>

Reply via email to