This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3faa663b5 [lake/paimon] Paimon lake table support non-string partition 
keys (#1817)
3faa663b5 is described below

commit 3faa663b57ee04d85fd9b703b17f611ff8bf8c95
Author: Liebing <[email protected]>
AuthorDate: Thu Oct 16 19:58:20 2025 +0800

    [lake/paimon] Paimon lake table support non-string partition keys (#1817)
---
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |   9 ++
 .../fluss/lake/paimon/tiering/RecordWriter.java    |  10 +-
 .../paimon/tiering/append/AppendOnlyWriter.java    |   6 +
 .../paimon/tiering/mergetree/MergeTreeWriter.java  |   6 +
 .../fluss/lake/paimon/utils/PaimonConversions.java |  31 ----
 .../lake/paimon/tiering/PaimonTieringITCase.java   | 161 +++++++++++++++++++++
 6 files changed, 189 insertions(+), 34 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index f84b6e590..03ffd0cdc 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -157,6 +157,9 @@ public class PaimonLakeCatalog implements LakeCatalog {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         Options options = new Options();
 
+        // set default properties
+        setPaimonDefaultProperties(options);
+
         // When bucket key is undefined, it should use dynamic bucket (bucket 
= -1) mode.
         List<String> bucketKeys = tableDescriptor.getBucketKeys();
         if (!bucketKeys.isEmpty()) {
@@ -215,6 +218,12 @@ public class PaimonLakeCatalog implements LakeCatalog {
         return schemaBuilder.build();
     }
 
+    private void setPaimonDefaultProperties(Options options) {
+        // set partition.legacy-name to false, otherwise paimon will use 
toString for all types,
+        // which will cause inconsistent partition value for a same binary 
value
+        options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
+    }
+
     private void setFlussPropertyToPaimon(String key, String value, Options 
options) {
         if (key.startsWith(PAIMON_CONF_PREFIX)) {
             options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
index f835b0486..413b14189 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 
-import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonPartitionBinaryRow;
 import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** A base interface to write {@link LogRecord} to Paimon. */
@@ -38,7 +37,8 @@ public abstract class RecordWriter<T> implements 
AutoCloseable {
     protected final TableWriteImpl<T> tableWrite;
     protected final RowType tableRowType;
     protected final int bucket;
-    @Nullable protected final BinaryRow partition;
+    protected final List<String> partitionKeys;
+    @Nullable protected BinaryRow partition;
     protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
 
     public RecordWriter(
@@ -50,7 +50,11 @@ public abstract class RecordWriter<T> implements 
AutoCloseable {
         this.tableWrite = tableWrite;
         this.tableRowType = tableRowType;
         this.bucket = tableBucket.getBucket();
-        this.partition = toPaimonPartitionBinaryRow(partitionKeys, partition);
+        this.partitionKeys = partitionKeys;
+        // set partition to EMPTY_ROW in advance for non-partitioned table
+        if (partition == null || partitionKeys.isEmpty()) {
+            this.partition = BinaryRow.EMPTY_ROW;
+        }
         this.flussRecordAsPaimonRow =
                 new FlussRecordAsPaimonRow(tableBucket.getBucket(), 
tableRowType);
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
index 2d571358d..7daec20b7 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
@@ -57,6 +57,12 @@ public class AppendOnlyWriter extends 
RecordWriter<InternalRow> {
     @Override
     public void write(LogRecord record) throws Exception {
         flussRecordAsPaimonRow.setFlussRecord(record);
+
+        // get partition once
+        if (partition == null) {
+            partition = tableWrite.getPartition(flussRecordAsPaimonRow);
+        }
+
         // hacky, call internal method tableWrite.getWrite() to support
         // to write to given partition, otherwise, it'll always extract a 
partition from Paimon row
         // which may be costly
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
index c27ebef6d..95a527518 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java
@@ -77,6 +77,12 @@ public class MergeTreeWriter extends RecordWriter<KeyValue> {
     @Override
     public void write(LogRecord record) throws Exception {
         flussRecordAsPaimonRow.setFlussRecord(record);
+
+        // get partition once
+        if (partition == null) {
+            partition = tableWrite.getPartition(flussRecordAsPaimonRow);
+        }
+
         rowKeyExtractor.setRecord(flussRecordAsPaimonRow);
         keyValue.replace(
                 rowKeyExtractor.trimmedPrimaryKey(),
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index c456411e8..a9491659f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -18,7 +18,6 @@
 package org.apache.fluss.lake.paimon.utils;
 
 import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
-import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
@@ -26,16 +25,11 @@ import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
-import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Function;
@@ -78,31 +72,6 @@ public class PaimonConversions {
         return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
 
-    public static BinaryRow toPaimonPartitionBinaryRow(
-            List<String> partitionKeys, @Nullable String partitionName) {
-        if (partitionName == null || partitionKeys.isEmpty()) {
-            return BinaryRow.EMPTY_ROW;
-        }
-
-        //  Fluss's existing utility
-        ResolvedPartitionSpec resolvedPartitionSpec =
-                ResolvedPartitionSpec.fromPartitionName(partitionKeys, 
partitionName);
-
-        BinaryRow partitionBinaryRow = new BinaryRow(partitionKeys.size());
-        BinaryRowWriter writer = new BinaryRowWriter(partitionBinaryRow);
-
-        List<String> partitionValues = 
resolvedPartitionSpec.getPartitionValues();
-        for (int i = 0; i < partitionKeys.size(); i++) {
-            // Todo Currently, partition column must be String datatype, so we 
can always use
-            // `BinaryString.fromString` to convert to Paimon's data 
structure. Revisit here when
-            // #489 is finished.
-            writer.writeString(i, 
BinaryString.fromString(partitionValues.get(i)));
-        }
-
-        writer.complete();
-        return partitionBinaryRow;
-    }
-
     public static Object toPaimonLiteral(DataType dataType, Object 
flussLiteral) {
         RowType rowType = RowType.of(dataType);
         InternalRow flussRow = GenericRow.of(flussLiteral);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 3a85a2aa7..18748c1bf 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -17,15 +17,22 @@
 
 package org.apache.fluss.lake.paimon.tiering;
 
+import org.apache.fluss.client.table.getter.PartitionGetter;
 import org.apache.fluss.config.AutoPartitionTimeUnit;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.utils.types.Tuple2;
@@ -40,8 +47,17 @@ import org.apache.paimon.utils.CloseableIterator;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +65,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static 
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -185,6 +202,150 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         }
     }
 
+    private static Stream<Arguments> tieringAllTypesWriteArgs() {
+        return Stream.of(Arguments.of(true), Arguments.of(false));
+    }
+
+    @ParameterizedTest
+    @MethodSource("tieringAllTypesWriteArgs")
+    void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
+        // create a table, write some records and wait until snapshot finished
+        TablePath t1 =
+                TablePath.of(
+                        DEFAULT_DB,
+                        isPrimaryKeyTable ? "pkTableForAllTypes" : 
"logTableForAllTypes");
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("c0", DataTypes.STRING())
+                        .column("c1", DataTypes.BOOLEAN())
+                        .column("c2", DataTypes.TINYINT())
+                        .column("c3", DataTypes.SMALLINT())
+                        .column("c4", DataTypes.INT())
+                        .column("c5", DataTypes.BIGINT())
+                        .column("c6", DataTypes.FLOAT())
+                        .column("c7", DataTypes.DOUBLE())
+                        // decimal not support for partition key
+                        .column("c8", DataTypes.DECIMAL(10, 2))
+                        .column("c9", DataTypes.CHAR(10))
+                        .column("c10", DataTypes.STRING())
+                        .column("c11", DataTypes.BYTES())
+                        .column("c12", DataTypes.BINARY(5))
+                        .column("c13", DataTypes.DATE())
+                        .column("c14", DataTypes.TIME(6))
+                        .column("c15", DataTypes.TIMESTAMP(6))
+                        .column("c16", DataTypes.TIMESTAMP_LTZ(6));
+        if (isPrimaryKeyTable) {
+            builder.primaryKey(
+                    "c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c9", 
"c10", "c11", "c12",
+                    "c13", "c14", "c15", "c16");
+        }
+        List<String> partitionKeys =
+                Arrays.asList(
+                        "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c9", "c10", 
"c11", "c12", "c13",
+                        "c14", "c15", "c16");
+        TableDescriptor.Builder tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(builder.build())
+                        .distributedBy(1, "c0")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        tableDescriptor.partitionedBy(partitionKeys);
+        tableDescriptor.customProperties(Collections.emptyMap());
+        tableDescriptor.properties(Collections.emptyMap());
+        long t1Id = createTable(t1, tableDescriptor.build());
+
+        // write records
+        List<InternalRow> rows =
+                Collections.singletonList(
+                        row(
+                                BinaryString.fromString("v0"),
+                                true,
+                                (byte) 1,
+                                (short) 2,
+                                3,
+                                4L,
+                                5.0f,
+                                6.0,
+                                Decimal.fromBigDecimal(new BigDecimal("0.09"), 
10, 2),
+                                BinaryString.fromString("v1"),
+                                BinaryString.fromString("v2"),
+                                "v3".getBytes(StandardCharsets.UTF_8),
+                                new byte[] {1, 2, 3, 4, 5},
+                                (int) LocalDate.of(2025, 10, 16).toEpochDay(),
+                                (int)
+                                        (LocalTime.of(10, 10, 10, 
123000000).toNanoOfDay()
+                                                / 1_000_000),
+                                TimestampNtz.fromLocalDateTime(
+                                        LocalDateTime.of(2025, 10, 16, 10, 10, 
10, 123000000)),
+                                TimestampLtz.fromInstant(
+                                        
Instant.parse("2025-10-16T10:10:10.123Z"))));
+        writeRows(t1, rows, !isPrimaryKeyTable);
+
+        TableInfo tableInfo = admin.getTableInfo(t1).get();
+        List<PartitionInfo> partitionInfos = 
admin.listPartitionInfos(t1).get();
+        assertThat(partitionInfos.size()).isEqualTo(1);
+        PartitionGetter partitionGetter =
+                new PartitionGetter(tableInfo.getRowType(), partitionKeys);
+        String partition = partitionGetter.getPartition(rows.get(0));
+        
assertThat(partitionInfos.get(0).getPartitionName()).isEqualTo(partition);
+
+        long partitionId = partitionInfos.get(0).getPartitionId();
+        TableBucket t1Bucket = new TableBucket(t1Id, partitionId, 0);
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(t1Bucket, 1);
+
+            // check data in paimon
+            Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
+                    getPaimonRowCloseableIterator(t1);
+            for (InternalRow expectedRow : rows) {
+                org.apache.paimon.data.InternalRow row = 
paimonRowIterator.next();
+                assertThat(row.getString(0).toString())
+                        .isEqualTo(expectedRow.getString(0).toString());
+                
assertThat(row.getBoolean(1)).isEqualTo(expectedRow.getBoolean(1));
+                assertThat(row.getByte(2)).isEqualTo(expectedRow.getByte(2));
+                assertThat(row.getShort(3)).isEqualTo(expectedRow.getShort(3));
+                assertThat(row.getInt(4)).isEqualTo(expectedRow.getInt(4));
+                assertThat(row.getLong(5)).isEqualTo(expectedRow.getLong(5));
+                assertThat(row.getFloat(6)).isEqualTo(expectedRow.getFloat(6));
+                
assertThat(row.getDouble(7)).isEqualTo(expectedRow.getDouble(7));
+                assertThat(row.getDecimal(8, 10, 2).toBigDecimal())
+                        .isEqualTo(expectedRow.getDecimal(8, 10, 
2).toBigDecimal());
+                assertThat(row.getString(9).toString())
+                        .isEqualTo(expectedRow.getString(9).toString());
+                assertThat(row.getString(10).toString())
+                        .isEqualTo(expectedRow.getString(10).toString());
+                
assertThat(row.getBinary(11)).isEqualTo(expectedRow.getBytes(11));
+                
assertThat(row.getBinary(12)).isEqualTo(expectedRow.getBinary(12, 5));
+                assertThat(row.getInt(13)).isEqualTo(expectedRow.getInt(13));
+                assertThat(row.getInt(14)).isEqualTo(expectedRow.getInt(14));
+                assertThat(row.getTimestamp(15, 6).getMillisecond())
+                        .isEqualTo(expectedRow.getTimestampNtz(15, 
6).getMillisecond());
+                assertThat(row.getTimestamp(16, 6).getMillisecond())
+                        .isEqualTo(expectedRow.getTimestampLtz(16, 
6).getEpochMillisecond());
+
+                // check snapshot in paimon
+                Map<String, String> properties =
+                        new HashMap<String, String>() {
+                            {
+                                put(
+                                        FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                        String.format(
+                                                
"[{\"partition_id\":%d,\"bucket\":0,\"partition_name\":\"c1=true/c2=1/c3=2/c4=3/c5=4/c6=5_0/c7=6_0/c9=v1/c10=v2/c11=7633/c12=0102030405/c13=2025-10-16/c14=10-10-10_123/c15=2025-10-16-10-10-10_123/c16=2025-10-16-10-10-10_123\",\"offset\":1}]",
+                                                partitionId));
+                            }
+                        };
+                checkSnapshotPropertyInPaimon(t1, properties);
+            }
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
     @Test
     void testTieringForAlterTable() throws Exception {
         TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");

Reply via email to