This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c06affb44 [lake/iceberg] Support partitioned table (#1565)
c06affb44 is described below

commit c06affb447d2f078ac107e7ed93f4364d0ba140a
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 19 21:29:42 2025 +0800

    [lake/iceberg] Support partitioned table (#1565)
---
 .../lake/iceberg/tiering/IcebergLakeWriter.java    |  37 +----
 .../fluss/lake/iceberg/tiering/RecordWriter.java   |  10 +-
 ...ndOnlyWriter.java => AppendOnlyTaskWriter.java} |  37 ++---
 .../iceberg/tiering/writer/DeltaTaskWriter.java    |  32 ++--
 .../writer/GenericRecordAppendOnlyWriter.java      |  66 ++++++++
 ...taWriter.java => GenericRecordDeltaWriter.java} |  27 +++-
 .../lake/iceberg/utils/IcebergConversions.java     |  24 +++
 .../fluss/lake/iceberg/IcebergTieringTest.java     | 168 ++++++++++++++-------
 .../lake/iceberg/utils/IcebergConversionsTest.java | 107 +++++++++++++
 .../lake/paimon/tiering/PaimonTieringTest.java     |  84 ++---------
 10 files changed, 379 insertions(+), 213 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index dd8e55e37..8c3feb64e 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -17,7 +17,7 @@
 
 package com.alibaba.fluss.lake.iceberg.tiering;
 
-import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
+import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyTaskWriter;
 import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
 import com.alibaba.fluss.lake.writer.LakeWriter;
 import com.alibaba.fluss.lake.writer.WriterInitContext;
@@ -25,7 +25,6 @@ import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.record.LogRecord;
 
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -37,7 +36,6 @@ import org.apache.iceberg.util.PropertyUtil;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
@@ -65,7 +63,6 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
     private RecordWriter createRecordWriter(WriterInitContext 
writerInitContext) {
         Schema schema = icebergTable.schema();
         List<Integer> equalityFieldIds = new 
ArrayList<>(schema.identifierFieldIds());
-        PartitionSpec spec = icebergTable.spec();
 
         // Get target file size from table properties
         long targetFileSize = targetFileSize(icebergTable);
@@ -79,34 +76,12 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
                         .format(format)
                         .build();
 
-        if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
-            if (spec.isUnpartitioned()) {
-                return new AppendOnlyWriter(
-                        icebergTable,
-                        writerInitContext.schema().getRowType(),
-                        writerInitContext.tableBucket(),
-                        null, // No partition for non-partitioned table
-                        Collections.emptyList(), // No partition keys
-                        format,
-                        outputFileFactory,
-                        targetFileSize);
-            } else {
-                return null;
-            }
+        if (equalityFieldIds.isEmpty()) {
+            return new AppendOnlyTaskWriter(
+                    icebergTable, writerInitContext, format, 
outputFileFactory, targetFileSize);
         } else {
-            if (spec.isUnpartitioned()) {
-                return new DeltaTaskWriter(
-                        icebergTable,
-                        writerInitContext.schema().getRowType(),
-                        writerInitContext.tableBucket(),
-                        null, // No partition for non-partitioned table
-                        Collections.emptyList(), // No partition keys);
-                        format,
-                        outputFileFactory,
-                        targetFileSize);
-            } else {
-                return null;
-            }
+            return new DeltaTaskWriter(
+                    icebergTable, writerInitContext, format, 
outputFileFactory, targetFileSize);
         }
     }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
index d6cde3394..444338e3e 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
@@ -26,30 +26,22 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 
-import javax.annotation.Nullable;
-
-import java.util.List;
-
 /** A base interface to write {@link LogRecord} to Iceberg. */
 public abstract class RecordWriter implements AutoCloseable {
 
     protected final TaskWriter<Record> taskWriter;
     protected final Schema icebergSchema;
     protected final int bucket;
-    @Nullable protected final String partition;
     protected final FlussRecordAsIcebergRecord flussRecordAsIcebergRecord;
 
     public RecordWriter(
             TaskWriter<Record> taskWriter,
             Schema icebergSchema,
             RowType flussRowType,
-            TableBucket tableBucket,
-            @Nullable String partition,
-            List<String> partitionKeys) {
+            TableBucket tableBucket) {
         this.taskWriter = taskWriter;
         this.icebergSchema = icebergSchema;
         this.bucket = tableBucket.getBucket();
-        this.partition = partition; // null for non-partitioned tables
         this.flussRecordAsIcebergRecord =
                 new FlussRecordAsIcebergRecord(
                         tableBucket.getBucket(), icebergSchema, flussRowType);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
similarity index 74%
rename from 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
rename to 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
index 0b8ace18c..59c05f7d6 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
@@ -18,9 +18,8 @@
 package com.alibaba.fluss.lake.iceberg.tiering.writer;
 
 import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
-import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
 import com.alibaba.fluss.record.LogRecord;
-import com.alibaba.fluss.types.RowType;
 
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
@@ -29,48 +28,40 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
-import org.apache.iceberg.io.UnpartitionedWriter;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
 
 /** A {@link RecordWriter} to write to Iceberg's append-only table. */
-public class AppendOnlyWriter extends RecordWriter {
+public class AppendOnlyTaskWriter extends RecordWriter {
 
-    public AppendOnlyWriter(
+    public AppendOnlyTaskWriter(
             Table icebergTable,
-            RowType flussRowType,
-            TableBucket tableBucket,
-            @Nullable String partition,
-            List<String> partitionKeys,
+            WriterInitContext writerInitContext,
             FileFormat format,
             OutputFileFactory outputFileFactory,
             long targetFileSize) {
         super(
-                createTaskWriter(icebergTable, format, outputFileFactory, 
targetFileSize),
+                createTaskWriter(
+                        icebergTable, writerInitContext, format, 
outputFileFactory, targetFileSize),
                 icebergTable.schema(),
-                flussRowType,
-                tableBucket,
-                partition,
-                partitionKeys);
+                writerInitContext.schema().getRowType(),
+                writerInitContext.tableBucket());
     }
 
     private static TaskWriter<Record> createTaskWriter(
             Table icebergTable,
+            WriterInitContext writerInitContext,
             FileFormat format,
             OutputFileFactory outputFileFactory,
             long targetFileSize) {
         FileAppenderFactory<Record> fileAppenderFactory =
-                new GenericAppenderFactory(icebergTable.schema());
-
-        return new UnpartitionedWriter<>(
-                icebergTable.spec(),
+                new GenericAppenderFactory(icebergTable.schema(), 
icebergTable.spec());
+        return new GenericRecordAppendOnlyWriter(
+                icebergTable,
                 format,
                 fileAppenderFactory,
                 outputFileFactory,
                 icebergTable.io(),
-                targetFileSize);
+                targetFileSize,
+                writerInitContext);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
index 4f16a5ce5..573ca1265 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -18,9 +18,8 @@
 package com.alibaba.fluss.lake.iceberg.tiering.writer;
 
 import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
-import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
 import com.alibaba.fluss.record.LogRecord;
-import com.alibaba.fluss.types.RowType;
 
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
@@ -31,8 +30,6 @@ import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,27 +38,24 @@ public class DeltaTaskWriter extends RecordWriter {
 
     public DeltaTaskWriter(
             Table icebergTable,
-            RowType flussRowType,
-            TableBucket tableBucket,
-            @Nullable String partition,
-            List<String> partitionKeys,
+            WriterInitContext writerInitContext,
             FileFormat format,
             OutputFileFactory outputFileFactory,
             long targetFileSize) {
         super(
-                createTaskWriter(icebergTable, format, outputFileFactory, 
targetFileSize),
+                createTaskWriter(
+                        icebergTable, format, outputFileFactory, 
targetFileSize, writerInitContext),
                 icebergTable.schema(),
-                flussRowType,
-                tableBucket,
-                partition,
-                partitionKeys);
+                writerInitContext.schema().getRowType(),
+                writerInitContext.tableBucket());
     }
 
     private static TaskWriter<Record> createTaskWriter(
             Table icebergTable,
             FileFormat format,
             OutputFileFactory outputFileFactory,
-            long targetFileSize) {
+            long targetFileSize,
+            WriterInitContext writerInitContext) {
         int[] equalityFieldIds =
                 icebergTable.schema().identifierFieldIds().stream()
                         .mapToInt(Integer::intValue)
@@ -79,20 +73,20 @@ public class DeltaTaskWriter extends RecordWriter {
             columns.add(icebergTable.schema().findField(fieldId).name());
         }
         Schema deleteSchema = icebergTable.schema().select(columns);
-        return new GenericTaskDeltaWriter(
-                icebergTable.schema(),
+        return new GenericRecordDeltaWriter(
+                icebergTable,
                 deleteSchema,
-                icebergTable.spec(),
                 format,
                 appenderFactory,
                 outputFileFactory,
                 icebergTable.io(),
-                targetFileSize);
+                targetFileSize,
+                writerInitContext);
     }
 
     @Override
     public void write(LogRecord record) throws Exception {
-        GenericTaskDeltaWriter deltaWriter = (GenericTaskDeltaWriter) 
taskWriter;
+        GenericRecordDeltaWriter deltaWriter = (GenericRecordDeltaWriter) 
taskWriter;
         flussRecordAsIcebergRecord.setFlussRecord(record);
         switch (record.getChangeType()) {
             case INSERT:
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
new file mode 100644
index 000000000..864414ee7
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
+
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
+
+/** A generic task writer to write {@link Record} for append-only table. * */
+public class GenericRecordAppendOnlyWriter extends BaseTaskWriter<Record> {
+
+    private final RollingFileWriter currentWriter;
+
+    protected GenericRecordAppendOnlyWriter(
+            Table icebergTable,
+            FileFormat format,
+            FileAppenderFactory<Record> appenderFactory,
+            OutputFileFactory fileFactory,
+            FileIO io,
+            long targetFileSize,
+            WriterInitContext writerInitContext) {
+        super(icebergTable.spec(), format, appenderFactory, fileFactory, io, 
targetFileSize);
+        currentWriter =
+                new RollingFileWriter(
+                        toPartition(
+                                icebergTable,
+                                writerInitContext.partition(),
+                                writerInitContext.tableBucket().getBucket()));
+    }
+
+    @Override
+    public void write(Record record) throws IOException {
+        currentWriter.write(record);
+    }
+
+    @Override
+    public void close() throws IOException {
+        currentWriter.close();
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
similarity index 73%
rename from 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
rename to 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
index 65ab644dd..5adae1a3a 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
@@ -17,11 +17,13 @@
 
 package com.alibaba.fluss.lake.iceberg.tiering.writer;
 
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.BaseTaskWriter;
 import org.apache.iceberg.io.FileAppenderFactory;
@@ -30,21 +32,30 @@ import org.apache.iceberg.io.OutputFileFactory;
 
 import java.io.IOException;
 
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
+
 /** A generic task equality delta writer. * */
-class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
+class GenericRecordDeltaWriter extends BaseTaskWriter<Record> {
     private final GenericEqualityDeltaWriter deltaWriter;
 
-    public GenericTaskDeltaWriter(
-            Schema schema,
+    public GenericRecordDeltaWriter(
+            Table icebergTable,
             Schema deleteSchema,
-            PartitionSpec spec,
             FileFormat format,
             FileAppenderFactory<Record> appenderFactory,
             OutputFileFactory fileFactory,
             FileIO io,
-            long targetFileSize) {
-        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
-        this.deltaWriter = new GenericEqualityDeltaWriter(null, schema, 
deleteSchema);
+            long targetFileSize,
+            WriterInitContext writerInitContext) {
+        super(icebergTable.spec(), format, appenderFactory, fileFactory, io, 
targetFileSize);
+        this.deltaWriter =
+                new GenericEqualityDeltaWriter(
+                        toPartition(
+                                icebergTable,
+                                writerInitContext.partition(),
+                                writerInitContext.tableBucket().getBucket()),
+                        icebergTable.schema(),
+                        deleteSchema);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
index de237c7bf..617f33959 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -19,8 +19,16 @@ package com.alibaba.fluss.lake.iceberg.utils;
 
 import com.alibaba.fluss.metadata.TablePath;
 
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 
+import javax.annotation.Nullable;
+
+import static 
com.alibaba.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
+
 /** Utility class for static conversions between Fluss and Iceberg types. */
 public class IcebergConversions {
 
@@ -28,4 +36,20 @@ public class IcebergConversions {
     public static TableIdentifier toIceberg(TablePath tablePath) {
         return TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
+
+    public static PartitionKey toPartition(
+            Table table, @Nullable String partitionName, int bucket) {
+        PartitionSpec partitionSpec = table.spec();
+        Schema schema = table.schema();
+        PartitionKey partitionKey = new PartitionKey(partitionSpec, schema);
+        int pos = 0;
+        if (partitionName != null) {
+            String[] partitionArr = partitionName.split("\\" + 
PARTITION_SPEC_SEPARATOR);
+            for (String partition : partitionArr) {
+                partitionKey.set(pos++, partition);
+            }
+        }
+        partitionKey.set(pos, bucket);
+        return partitionKey;
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
index 7e15f41e1..8a666b677 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
@@ -36,6 +36,7 @@ import com.alibaba.fluss.row.GenericRow;
 import com.alibaba.fluss.types.DataTypes;
 import com.alibaba.fluss.utils.types.Tuple2;
 
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
@@ -44,7 +45,6 @@ import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeEach;
@@ -77,11 +77,14 @@ import static com.alibaba.fluss.record.ChangeType.DELETE;
 import static com.alibaba.fluss.record.ChangeType.INSERT;
 import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
 import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
+import static org.apache.iceberg.expressions.Expressions.equal;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
 class IcebergTieringTest {
 
+    private static final int BUCKET_NUM = 3;
+
     private @TempDir File tempWarehouseDir;
     private IcebergLakeTieringFactory icebergLakeTieringFactory;
     private Catalog icebergCatalog;
@@ -99,24 +102,40 @@ class IcebergTieringTest {
     }
 
     private static Stream<Arguments> tieringWriteArgs() {
-        return Stream.of(Arguments.of(true), Arguments.of(false));
+        return Stream.of(
+                // isPrimaryKeyTable, isPartitionedTable
+                Arguments.of(true, true),
+                Arguments.of(true, false),
+                Arguments.of(false, true),
+                Arguments.of(false, false));
     }
 
     @ParameterizedTest
     @MethodSource("tieringWriteArgs")
-    void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
-        int bucketNum = 3;
+    void testTieringWriteTable(boolean isPrimaryKeyTable, boolean 
isPartitionedTable)
+            throws Exception {
         TablePath tablePath =
                 TablePath.of(
                         "iceberg",
                         String.format(
-                                "test_tiering_table_%s",
-                                isPrimaryKeyTable ? "primary_key" : "log"));
-        createTable(tablePath, isPrimaryKeyTable);
+                                "test_tiering_table_%s_%s",
+                                isPrimaryKeyTable ? "pk" : "log",
+                                isPartitionedTable ? "partitioned" : 
"unpartitioned"));
+        createTable(tablePath, isPrimaryKeyTable, isPartitionedTable);
 
         Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
 
-        Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
+        Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
+        Map<Long, String> partitionIdAndName =
+                isPartitionedTable
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
 
         List<IcebergWriteResult> icebergWriteResults = new ArrayList<>();
         SimpleVersionedSerializer<IcebergWriteResult> writeResultSerializer =
@@ -125,24 +144,30 @@ class IcebergTieringTest {
                 icebergLakeTieringFactory.getCommittableSerializer();
 
         // first, write data
-        for (int bucket = 0; bucket < bucketNum; bucket++) {
-            try (LakeWriter<IcebergWriteResult> writer = 
createLakeWriter(tablePath, bucket)) {
-                Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords 
=
-                        isPrimaryKeyTable
-                                ? genPrimaryKeyTableRecords(bucket)
-                                : genLogTableRecords(bucket, 10);
-
-                List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
-                List<LogRecord> expectRecords = writeAndExpectRecords.f1;
-                recordsByBucket.put(bucket, expectRecords);
-                for (LogRecord record : writtenRecords) {
-                    writer.write(record);
+        for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<IcebergWriteResult> writer =
+                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            isPrimaryKeyTable
+                                    ? genPrimaryKeyTableRecords(partition, 
bucket)
+                                    : genLogTableRecords(partition, bucket, 
10);
+
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+
+                    for (LogRecord record : writtenRecords) {
+                        writer.write(record);
+                    }
+                    IcebergWriteResult result = writer.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(result);
+                    icebergWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
                 }
-                IcebergWriteResult result = writer.complete();
-                byte[] serialized = writeResultSerializer.serialize(result);
-                icebergWriteResults.add(
-                        writeResultSerializer.deserialize(
-                                writeResultSerializer.getVersion(), 
serialized));
             }
         }
 
@@ -166,17 +191,18 @@ class IcebergTieringTest {
 
         // then, check data
         for (int bucket = 0; bucket < 3; bucket++) {
-            List<LogRecord> expectRecords = recordsByBucket.get(bucket);
-            CloseableIterator<Record> actualRecords = 
getIcebergRows(icebergTable, bucket);
-            if (isPrimaryKeyTable) {
-                verifyTableRecords(actualRecords, expectRecords, bucket);
-            } else {
-                verifyTableRecords(actualRecords, expectRecords, bucket);
+            for (String partition : partitionIdAndName.values()) {
+                Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, 
bucket);
+                List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
+                CloseableIterator<Record> actualRecords =
+                        getIcebergRows(icebergTable, partition, bucket);
+                verifyTableRecords(actualRecords, expectRecords, bucket, 
partition);
             }
         }
     }
 
-    private LakeWriter<IcebergWriteResult> createLakeWriter(TablePath 
tablePath, int bucket)
+    private LakeWriter<IcebergWriteResult> createLakeWriter(
+            TablePath tablePath, int bucket, @Nullable String partition, 
@Nullable Long partitionId)
             throws IOException {
         return icebergLakeTieringFactory.createLakeWriter(
                 new WriterInitContext() {
@@ -187,13 +213,13 @@ class IcebergTieringTest {
 
                     @Override
                     public TableBucket tableBucket() {
-                        return new TableBucket(0, null, bucket);
+                        return new TableBucket(0, partitionId, bucket);
                     }
 
                     @Nullable
                     @Override
                     public String partition() {
-                        return null;
+                        return partition;
                     }
 
                     @Override
@@ -218,13 +244,17 @@ class IcebergTieringTest {
     }
 
     private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
-            int bucket, int numRecords) {
+            @Nullable String partition, int bucket, int numRecords) {
         List<LogRecord> logRecords = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
             GenericRow genericRow = new GenericRow(3);
             genericRow.setField(0, i);
             genericRow.setField(1, BinaryString.fromString("bucket" + bucket + 
"_" + i));
-            genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+            if (partition != null) {
+                genericRow.setField(2, BinaryString.fromString(partition));
+            } else {
+                genericRow.setField(2, BinaryString.fromString("bucket" + 
bucket));
+            }
 
             LogRecord logRecord =
                     new GenericRecord(
@@ -234,10 +264,11 @@ class IcebergTieringTest {
         return Tuple2.of(logRecords, logRecords);
     }
 
-    private Tuple2<List<LogRecord>, List<LogRecord>> 
genPrimaryKeyTableRecords(int bucket) {
+    private Tuple2<List<LogRecord>, List<LogRecord>> genPrimaryKeyTableRecords(
+            @Nullable String partition, int bucket) {
         int offset = -1;
         // gen +I, -U, +U, -D
-        List<GenericRow> rows = genKvRow(bucket, 0, 0, 4);
+        List<GenericRow> rows = genKvRow(partition, bucket, 0, 0, 4);
         List<LogRecord> writtenLogRecords =
                 new ArrayList<>(
                         Arrays.asList(
@@ -248,7 +279,7 @@ class IcebergTieringTest {
         List<LogRecord> expectLogRecords = new ArrayList<>();
 
         // gen +I, -U, +U
-        rows = genKvRow(bucket, 1, 4, 7);
+        rows = genKvRow(partition, bucket, 1, 4, 7);
         writtenLogRecords.addAll(
                 Arrays.asList(
                         toRecord(++offset, rows.get(0), INSERT),
@@ -257,7 +288,7 @@ class IcebergTieringTest {
         expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 
1));
 
         // gen +I, +U
-        rows = genKvRow(bucket, 2, 7, 9);
+        rows = genKvRow(partition, bucket, 2, 7, 9);
         writtenLogRecords.addAll(
                 Arrays.asList(
                         toRecord(++offset, rows.get(0), INSERT),
@@ -265,23 +296,27 @@ class IcebergTieringTest {
         expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 
1));
 
         // gen +I
-        rows = genKvRow(bucket, 3, 9, 10);
+        rows = genKvRow(partition, bucket, 3, 9, 10);
         writtenLogRecords.add(toRecord(++offset, rows.get(0), INSERT));
         expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 
1));
 
         return Tuple2.of(writtenLogRecords, expectLogRecords);
     }
 
-    private List<GenericRow> genKvRow(int bucket, int key, int from, int to) {
+    private List<GenericRow> genKvRow(
+            @Nullable String partition, int bucket, int key, int from, int to) 
{
         List<GenericRow> rows = new ArrayList<>();
         for (int i = from; i < to; i++) {
             GenericRow genericRow;
-            // Non-partitioned table
             genericRow = new GenericRow(3);
             genericRow.setField(0, key);
             genericRow.setField(1, BinaryString.fromString("bucket" + bucket + 
"_" + i));
-            genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
 
+            if (partition != null) {
+                genericRow.setField(2, BinaryString.fromString(partition));
+            } else {
+                genericRow.setField(2, BinaryString.fromString("bucket" + 
bucket));
+            }
             rows.add(genericRow);
         }
         return rows;
@@ -291,7 +326,8 @@ class IcebergTieringTest {
         return new GenericRecord(offset, System.currentTimeMillis(), 
changeType, row);
     }
 
-    private void createTable(TablePath tablePath, boolean isPrimaryTable) 
throws Exception {
+    private void createTable(
+            TablePath tablePath, boolean isPrimaryTable, boolean 
isPartitionedTable) {
         Namespace namespace = Namespace.of(tablePath.getDatabaseName());
         if (icebergCatalog instanceof SupportsNamespaces) {
             SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
@@ -303,6 +339,10 @@ class IcebergTieringTest {
         Set<Integer> identifierFieldIds = new HashSet<>();
         if (isPrimaryTable) {
             identifierFieldIds.add(1);
+            if (isPartitionedTable) {
+                // we use c3 as the partition column, so c3 should also be 
included the pk
+                identifierFieldIds.add(3);
+            }
         }
 
         org.apache.iceberg.Schema schema =
@@ -310,7 +350,7 @@ class IcebergTieringTest {
                         Arrays.asList(
                                 Types.NestedField.required(1, "c1", 
Types.IntegerType.get()),
                                 Types.NestedField.optional(2, "c2", 
Types.StringType.get()),
-                                Types.NestedField.optional(3, "c3", 
Types.StringType.get()),
+                                Types.NestedField.required(3, "c3", 
Types.StringType.get()),
                                 Types.NestedField.required(
                                         4, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
                                 Types.NestedField.required(
@@ -319,22 +359,40 @@ class IcebergTieringTest {
                                         6, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())),
                         identifierFieldIds);
 
+        PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+        if (isPartitionedTable) {
+            builder.identity("c3");
+        }
+
+        PartitionSpec partitionSpec;
+        if (isPrimaryTable) {
+            partitionSpec = builder.bucket("c1", BUCKET_NUM).build();
+        } else {
+            partitionSpec = builder.identity(BUCKET_COLUMN_NAME).build();
+        }
+
         TableIdentifier tableId =
                 TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
-        icebergCatalog.createTable(tableId, schema);
+        icebergCatalog.createTable(tableId, schema, partitionSpec);
     }
 
-    private CloseableIterator<Record> getIcebergRows(Table table, int bucket) {
-        return IcebergGenerics.read(table)
-                .where(Expressions.equal(BUCKET_COLUMN_NAME, bucket))
-                .build()
-                .iterator();
+    private CloseableIterator<Record> getIcebergRows(
+            Table table, @Nullable String partition, int bucket) {
+        IcebergGenerics.ScanBuilder scanBuilder =
+                IcebergGenerics.read(table).where(equal(BUCKET_COLUMN_NAME, 
bucket));
+        if (partition != null) {
+            String partitionCol = table.spec().fields().get(0).name();
+            scanBuilder = scanBuilder.where(equal(partitionCol, partition));
+        }
+
+        return scanBuilder.build().iterator();
     }
 
     private void verifyTableRecords(
             CloseableIterator<Record> actualRecords,
             List<LogRecord> expectRecords,
-            int expectBucket) {
+            int expectBucket,
+            @Nullable String partition) {
         for (LogRecord expectRecord : expectRecords) {
             Record actualRecord = actualRecords.next();
             // check business columns:
@@ -343,6 +401,10 @@ class IcebergTieringTest {
                     .isEqualTo(expectRecord.getRow().getString(1).toString());
             assertThat(actualRecord.get(2, String.class))
                     .isEqualTo(expectRecord.getRow().getString(2).toString());
+            if (partition != null) {
+                assertThat(actualRecord.get(2, 
String.class)).isEqualTo(partition);
+            }
+
             // check system columns: __bucket, __offset, __timestamp
             assertThat(actualRecord.get(3)).isEqualTo(expectBucket);
             
assertThat(actualRecord.get(4)).isEqualTo(expectRecord.logOffset());
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversionsTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversionsTest.java
new file mode 100644
index 000000000..9e294f924
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversionsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.utils;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link IcebergConversions}. */
+class IcebergConversionsTest {;
+
+    @Test
+    void testToPartition(@TempDir File tempWarehouseDir) {
+        Catalog catalog = getIcebergCatalog(tempWarehouseDir);
+
+        TablePath tablePath = TablePath.of("default", 
"fluss_non_partitioned_table");
+        // for non-multiple partition column partitioned table
+        Table table = createIcebergTable(catalog, tablePath, false);
+        PartitionKey partitionKey = IcebergConversions.toPartition(table, 
null, 1);
+        assertThat(partitionKey.toPath()).isEqualTo("__bucket=1");
+
+        // for multiple partition columns partitioned table
+        tablePath = TablePath.of("default", "fluss_partitioned_table");
+        table = createIcebergTable(catalog, tablePath, true);
+        partitionKey = IcebergConversions.toPartition(table, "china$region1", 
2);
+        
assertThat(partitionKey.toPath()).isEqualTo("country=china/region=region1/__bucket=2");
+    }
+
+    private Catalog getIcebergCatalog(File tempWarehouseDir) {
+        Configuration configuration = new Configuration();
+        configuration.setString("warehouse", 
tempWarehouseDir.toURI().toString());
+        configuration.setString("catalog-impl", 
"org.apache.iceberg.inmemory.InMemoryCatalog");
+        configuration.setString("name", "fluss_test_catalog");
+
+        Map<String, String> icebergProps = configuration.toMap();
+        String catalogName = icebergProps.getOrDefault("name", 
"default_iceberg_catalog");
+        return buildIcebergCatalog(
+                catalogName, icebergProps, 
IcebergConfiguration.from(configuration).get());
+    }
+
+    private Table createIcebergTable(
+            Catalog catalog, TablePath tablePath, boolean 
isMultiplePartitionKeyTable) {
+        Schema schema =
+                new Schema(
+                        required(1, "id", Types.LongType.get()),
+                        optional(2, "name", Types.StringType.get()),
+                        optional(3, "country", Types.StringType.get()),
+                        optional(4, "region", Types.StringType.get()),
+                        optional(5, "__bucket", Types.IntegerType.get()));
+        PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
+        if (isMultiplePartitionKeyTable) {
+            specBuilder.identity("country").identity("region");
+        }
+
+        // we always us __bucket as partition spec
+        specBuilder.identity("__bucket");
+
+        TableIdentifier tableIdentifier = toIceberg(tablePath);
+
+        try {
+            ((SupportsNamespaces) 
catalog).createNamespace(tableIdentifier.namespace());
+        } catch (AlreadyExistsException ignore) {
+            // ignore
+        }
+
+        catalog.createTable(tableIdentifier, schema, specBuilder.build());
+
+        return catalog.loadTable(tableIdentifier);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index b98f8aceb..c323a584f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -197,12 +197,7 @@ class PaimonTieringTest {
                 List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
                 CloseableIterator<InternalRow> actualRecords =
                         getPaimonRows(tablePath, partition, isPrimaryKeyTable, 
bucket);
-                if (isPrimaryKeyTable) {
-                    verifyPrimaryKeyTableRecord(actualRecords, expectRecords, 
bucket, partition);
-                } else {
-                    verifyLogTableRecords(
-                            actualRecords, expectRecords, bucket, 
isPartitioned, partition);
-                }
+                verifyTableRecords(actualRecords, expectRecords, bucket, 
partition);
             }
         }
 
@@ -365,43 +360,6 @@ class PaimonTieringTest {
         }
     }
 
-    private void verifyLogTableRecords(
-            CloseableIterator<InternalRow> actualRecords,
-            List<LogRecord> expectRecords,
-            int expectBucket,
-            boolean isPartitioned,
-            @Nullable String partition)
-            throws Exception {
-        for (LogRecord expectRecord : expectRecords) {
-            InternalRow actualRow = actualRecords.next();
-            // check business columns:
-            
assertThat(actualRow.getInt(0)).isEqualTo(expectRecord.getRow().getInt(0));
-            assertThat(actualRow.getString(1).toString())
-                    .isEqualTo(expectRecord.getRow().getString(1).toString());
-
-            if (isPartitioned) {
-                // For partitioned tables, partition field comes from metadata
-                
assertThat(actualRow.getString(2).toString()).isEqualTo(partition);
-                // check system columns: __bucket, __offset, __timestamp
-                assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-                
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
-                assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
-                        .isEqualTo(expectRecord.timestamp());
-            } else {
-                // For non-partitioned tables, c3 is business data
-                assertThat(actualRow.getString(2).toString())
-                        
.isEqualTo(expectRecord.getRow().getString(2).toString());
-                // check system columns: __bucket, __offset, __timestamp
-                assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-                
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
-                assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
-                        .isEqualTo(expectRecord.timestamp());
-            }
-        }
-        assertThat(actualRecords.hasNext()).isFalse();
-        actualRecords.close();
-    }
-
     private void verifyLogTableRecordsMultiPartition(
             CloseableIterator<InternalRow> actualRecords,
             List<LogRecord> expectRecords,
@@ -460,7 +418,7 @@ class PaimonTieringTest {
         actualRecords.close();
     }
 
-    private void verifyPrimaryKeyTableRecord(
+    private void verifyTableRecords(
             CloseableIterator<InternalRow> actualRecords,
             List<LogRecord> expectRecords,
             int expectBucket,
@@ -473,26 +431,16 @@ class PaimonTieringTest {
             assertThat(actualRow.getString(1).toString())
                     .isEqualTo(expectRecord.getRow().getString(1).toString());
 
+            assertThat(actualRow.getString(2).toString())
+                    .isEqualTo(expectRecord.getRow().getString(2).toString());
             if (partition != null) {
-                // For partitioned tables, partition field should match record 
data
-                assertThat(actualRow.getString(2).toString())
-                        
.isEqualTo(expectRecord.getRow().getString(2).toString());
-                // check system columns: __bucket, __offset, __timestamp
-                assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-                
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
-                long actualTimestamp = actualRow.getTimestamp(5, 
6).getMillisecond();
-                long expectedTimestamp = expectRecord.timestamp();
-                assertThat(Math.abs(actualTimestamp - 
expectedTimestamp)).isLessThanOrEqualTo(10L);
-            } else {
-                // For non-partitioned tables
-                assertThat(actualRow.getString(2).toString())
-                        
.isEqualTo(expectRecord.getRow().getString(2).toString());
-                // check system columns: __bucket, __offset, __timestamp
-                assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
-                
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
-                assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
-                        .isEqualTo(expectRecord.timestamp());
+                
assertThat(actualRow.getString(2).toString()).isEqualTo(partition);
             }
+            // check system columns: __bucket, __offset, __timestamp
+            assertThat(actualRow.getInt(3)).isEqualTo(expectBucket);
+            
assertThat(actualRow.getLong(4)).isEqualTo(expectRecord.logOffset());
+            assertThat(actualRow.getTimestamp(5, 6).getMillisecond())
+                    .isEqualTo(expectRecord.timestamp());
         }
         assertThat(actualRecords.hasNext()).isFalse();
         actualRecords.close();
@@ -503,17 +451,13 @@ class PaimonTieringTest {
         List<LogRecord> logRecords = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
             GenericRow genericRow;
+            // Partitioned table: include partition field in data
+            genericRow = new GenericRow(3); // c1, c2, c3(partition)
+            genericRow.setField(0, i);
+            genericRow.setField(1, BinaryString.fromString("bucket" + bucket + 
"_" + i));
             if (partition != null) {
-                // Partitioned table: include partition field in data
-                genericRow = new GenericRow(3); // c1, c2, c3(partition)
-                genericRow.setField(0, i);
-                genericRow.setField(1, BinaryString.fromString("bucket" + 
bucket + "_" + i));
                 genericRow.setField(2, BinaryString.fromString(partition)); // 
partition field
             } else {
-                // Non-partitioned table
-                genericRow = new GenericRow(3);
-                genericRow.setField(0, i);
-                genericRow.setField(1, BinaryString.fromString("bucket" + 
bucket + "_" + i));
                 genericRow.setField(2, BinaryString.fromString("bucket" + 
bucket));
             }
             LogRecord logRecord =

Reply via email to