This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 54b515b73 [paimon] Bump Paimon version from 1.0.1 to 1.2.0 (#1289)
54b515b73 is described below
commit 54b515b737e41da564dd5f3a094b4a6fcebf6392
Author: CaoZhen <[email protected]>
AuthorDate: Mon Jul 21 20:17:02 2025 +0800
[paimon] Bump Paimon version from 1.0.1 to 1.2.0 (#1289)
---
.../lakehouse/paimon/reader/ScanRecordWrapper.java | 6 ++++
fluss-lake/fluss-lake-paimon/pom.xml | 10 +++++-
.../paimon/tiering/FlussRecordAsPaimonRow.java | 7 ++++
.../lake/paimon/tiering/PaimonLakeCommitter.java | 42 ++++++++++++++--------
.../src/main/resources/META-INF/NOTICE | 2 +-
.../lake/paimon/tiering/PaimonTieringTest.java | 3 ++
pom.xml | 2 +-
7 files changed, 54 insertions(+), 18 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
index b79c3a272..47e844503 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -157,6 +158,11 @@ public class ScanRecordWrapper implements InternalRow {
return flussRow.getBytes(pos);
}
+ @Override
+ public Variant getVariant(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public InternalArray getArray(int pos) {
throw new UnsupportedOperationException();
diff --git a/fluss-lake/fluss-lake-paimon/pom.xml
b/fluss-lake/fluss-lake-paimon/pom.xml
index 322caa994..e8df4406d 100644
--- a/fluss-lake/fluss-lake-paimon/pom.xml
+++ b/fluss-lake/fluss-lake-paimon/pom.xml
@@ -33,7 +33,7 @@
<packaging>jar</packaging>
<properties>
- <paimon.version>1.0.1</paimon.version>
+ <paimon.version>1.2.0</paimon.version>
</properties>
<dependencies>
@@ -75,6 +75,14 @@
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>2.8.5</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index 041380f07..97d543148 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -189,6 +190,12 @@ public class FlussRecordAsPaimonRow implements InternalRow
{
return internalRow.getBytes(pos);
}
+ @Override
+ public Variant getVariant(int pos) {
+ throw new UnsupportedOperationException(
+ "getVariant is not support for Fluss record currently.");
+ }
+
@Override
public InternalArray getArray(int pos) {
throw new UnsupportedOperationException(
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index ab2f691b3..c59271005 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -21,10 +21,12 @@ import
com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
import com.alibaba.fluss.lake.committer.LakeCommitter;
import com.alibaba.fluss.metadata.TablePath;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreCommit;
@@ -56,6 +58,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private final FileStoreTable fileStoreTable;
private FileStoreCommit fileStoreCommit;
private final TablePath tablePath;
+ private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider,
TablePath tablePath)
throws IOException {
@@ -77,18 +80,16 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
@Override
public long commit(PaimonCommittable committable) throws IOException {
ManifestCommittable manifestCommittable =
committable.manifestCommittable();
- PaimonCommitCallback paimonCommitCallback = new PaimonCommitCallback();
try {
fileStoreCommit =
fileStoreTable
.store()
- .newCommit(
- FLUSS_LAKE_TIERING_COMMIT_USER,
-
Collections.singletonList(paimonCommitCallback));
- fileStoreCommit.commit(manifestCommittable,
Collections.emptyMap());
- return checkNotNull(
- paimonCommitCallback.commitSnapshotId,
- "Paimon committed snapshot id must be non-null.");
+ .newCommit(FLUSS_LAKE_TIERING_COMMIT_USER,
fileStoreTable);
+ fileStoreCommit.commit(manifestCommittable, false);
+ Long commitSnapshotId = currentCommitSnapshotId.get();
+ currentCommitSnapshotId.remove();
+
+ return checkNotNull(commitSnapshotId, "Paimon committed snapshot
id must be non-null.");
} catch (Throwable t) {
if (fileStoreCommit != null) {
// if any error happen while commit, abort the commit to clean
committable
@@ -100,7 +101,8 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
@Override
public void abort(PaimonCommittable committable) throws IOException {
- fileStoreCommit =
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
+ fileStoreCommit =
+
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER,
fileStoreTable);
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
}
@@ -189,19 +191,29 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private FileStoreTable getTable(TablePath tablePath) throws IOException {
try {
- return (FileStoreTable)
paimonCatalog.getTable(toPaimon(tablePath));
+ FileStoreTable table =
+ (FileStoreTable)
+ paimonCatalog
+ .getTable(toPaimon(tablePath))
+ .copy(
+ Collections.singletonMap(
+
CoreOptions.COMMIT_CALLBACKS.key(),
+
PaimonLakeCommitter.PaimonCommitCallback.class
+ .getName()));
+
+ return table;
} catch (Exception e) {
throw new IOException("Failed to get table " + tablePath + " in
Paimon.", e);
}
}
- private static class PaimonCommitCallback implements CommitCallback {
-
- private Long commitSnapshotId = null;
+ /** A {@link CommitCallback} to save paimon commit snapshot info. */
+ public static class PaimonCommitCallback implements CommitCallback {
@Override
- public void call(List<ManifestEntry> list, Snapshot snapshot) {
- this.commitSnapshotId = snapshot.id();
+ public void call(
+ List<ManifestEntry> list, List<IndexManifestEntry> indexFiles,
Snapshot snapshot) {
+ currentCommitSnapshotId.set(snapshot.id());
}
@Override
diff --git a/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
b/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
index 82092698a..c2d4c8a6d 100644
--- a/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.paimon:paimon-bundle:1.0.1
+- org.apache.paimon:paimon-bundle:1.2.0
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 86aff158e..ae02112fb 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -731,6 +731,9 @@ class PaimonTieringTest {
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
paimonSchemaBuilder.column(
TIMESTAMP_COLUMN_NAME,
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ paimonSchemaBuilder.option(
+ CoreOptions.COMMIT_CALLBACKS.key(),
+ PaimonLakeCommitter.PaimonCommitCallback.class.getName());
paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
paimonCatalog.createTable(toPaimon(tablePath),
paimonSchemaBuilder.build(), true);
}
diff --git a/pom.xml b/pom.xml
index 8527f5996..87af19cfc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
<curator.version>5.4.0</curator.version>
<netty.version>4.1.104</netty.version>
<arrow.version>15.0.0</arrow.version>
- <paimon.version>1.0.1</paimon.version>
+ <paimon.version>1.2.0</paimon.version>
<fluss.hadoop.version>2.10.2</fluss.hadoop.version>
<frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>