This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 776cb8906 Issue 489 parition key more types apache (#1264)
776cb8906 is described below

commit 776cb8906b49564fb610744020648e3df6a032a0
Author: Maggie Cao <[email protected]>
AuthorDate: Mon Sep 15 06:34:16 2025 -0700

    Issue 489 parition key more types apache (#1264)
    
    * [feature] partition key support more data types (#489)
    
    * [fix] update copyright year
    
    * [fix] partition tests for BinaryString keys
    
    * [fix] remove unnecessary test
    
    * update based on copilot suggesion.
    
    * add testPartitionedTable test
    
    * [server] whitespace issues
    
    * [feature] partition key support more data types (#489)
    
    * [fix] partition tests for BinaryString keys
    
    * [docs] change the docs on partition definition
    
    * [fix] delete empty files
    
    ---------
    
    Co-authored-by: ipolyzos <[email protected]>
---
 .../fluss/client/table/getter/PartitionGetter.java |  11 +-
 .../fluss/client/admin/FlussAdminITCase.java       |  23 --
 .../client/table/NewPartitionedTableITCase.java    | 234 +++++++++++++++++++++
 .../fluss/types/PartitionNameConverters.java       | 133 ++++++++++++
 .../org/apache/fluss/utils/PartitionUtils.java     |  86 +++++++-
 .../org/apache/fluss/utils/PartitionUtilsTest.java | 175 +++++++++++++++
 .../org/apache/fluss/rpc/gateway/AdminGateway.java |   2 +-
 .../server/coordinator/TableManagerITCase.java     |  24 ---
 .../table-design/data-distribution/partitioning.md |   2 +-
 9 files changed, 636 insertions(+), 54 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java
index ed567f409..b5d7fdcdb 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/getter/PartitionGetter.java
@@ -21,6 +21,7 @@ import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.PartitionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -33,12 +34,14 @@ public class PartitionGetter {
 
     private final List<String> partitionKeys;
     private final List<InternalRow.FieldGetter> partitionFieldGetters;
+    private final List<DataType> partitionTypes;
 
     public PartitionGetter(RowType rowType, List<String> partitionKeys) {
         // check the partition column
         List<String> fieldNames = rowType.getFieldNames();
         this.partitionKeys = partitionKeys;
         partitionFieldGetters = new ArrayList<>();
+        partitionTypes = new ArrayList<>();
         for (String partitionKey : partitionKeys) {
             int partitionColumnIndex = fieldNames.indexOf(partitionKey);
             checkArgument(
@@ -49,6 +52,7 @@ public class PartitionGetter {
 
             // check the data type of the partition column
             DataType partitionColumnDataType = 
rowType.getTypeAt(partitionColumnIndex);
+            partitionTypes.add(partitionColumnDataType);
             partitionFieldGetters.add(
                     InternalRow.createFieldGetter(partitionColumnDataType, 
partitionColumnIndex));
         }
@@ -56,10 +60,13 @@ public class PartitionGetter {
 
     public String getPartition(InternalRow row) {
         List<String> partitionValues = new ArrayList<>();
-        for (InternalRow.FieldGetter partitionFieldGetter : 
partitionFieldGetters) {
+        for (int i = 0; i < partitionFieldGetters.size(); i++) {
+            InternalRow.FieldGetter partitionFieldGetter = 
partitionFieldGetters.get(i);
+            DataType dataType = partitionTypes.get(i);
             Object partitionValue = partitionFieldGetter.getFieldOrNull(row);
             checkNotNull(partitionValue, "Partition value shouldn't be null.");
-            partitionValues.add(partitionValue.toString());
+            partitionValues.add(
+                    PartitionUtils.convertValueOfType(partitionValue, 
dataType.getTypeRoot()));
         }
         ResolvedPartitionSpec resolvedPartitionSpec =
                 new ResolvedPartitionSpec(partitionKeys, partitionValues);
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index bf45d5974..0422396c1 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -647,29 +647,6 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
         
assertThat(serverNodes).containsExactlyInAnyOrderElementsOf(expectedNodes);
     }
 
-    @Test
-    void testCreateIllegalPartitionTable() {
-        String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
-        TableDescriptor partitionedTable =
-                TableDescriptor.builder()
-                        .schema(
-                                Schema.newBuilder()
-                                        .column("id", DataTypes.STRING())
-                                        .column("name", DataTypes.STRING())
-                                        .column("dt", DataTypes.DATE())
-                                        .build())
-                        .distributedBy(3, "id")
-                        .partitionedBy("name", "dt")
-                        .build();
-        TablePath tablePath = TablePath.of(dbName, 
"test_create_illegal_partitioned_table_1");
-        assertThatThrownBy(() -> admin.createTable(tablePath, 
partitionedTable, true).get())
-                .cause()
-                .isInstanceOf(InvalidTableException.class)
-                .hasMessageContaining(
-                        "Currently, partitioned table supported partition key 
type are [STRING], "
-                                + "but got partition key 'dt' with data type 
DATE.");
-    }
-
     @Test
     void testAddAndDropPartitions() throws Exception {
         String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
new file mode 100644
index 000000000..230d0ee52
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/NewPartitionedTableITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.metadata.PartitionInfo;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for Fluss partitioned table supporting partition key of different 
types. */
+class NewPartitionedTableITCase extends ClientToServerITCaseBase {
+    Schema.Builder schemaBuilder =
+            Schema.newBuilder()
+                    .column("a", new StringType())
+                    .column("char", new CharType())
+                    .column("binary", new BinaryType())
+                    .column("boolean", new BooleanType())
+                    .column("bytes", new BytesType())
+                    .column("tinyInt", new TinyIntType())
+                    .column("smallInt", new SmallIntType())
+                    .column("int", new IntType())
+                    .column("bigInt", new BigIntType())
+                    .column("date", new DateType())
+                    .column("float", new FloatType())
+                    .column("double", new DoubleType())
+                    .column("time", new TimeType())
+                    .column("timeStampNTZ", new TimestampType())
+                    .column("timeStampLTZ", new LocalZonedTimestampType());
+
+    Schema schema = schemaBuilder.build();
+    DataTypeRoot[] allPartitionKeyTypes =
+            new DataTypeRoot[] {
+                DataTypeRoot.STRING,
+                DataTypeRoot.CHAR,
+                DataTypeRoot.BINARY,
+                DataTypeRoot.BOOLEAN,
+                DataTypeRoot.BYTES,
+                DataTypeRoot.TINYINT,
+                DataTypeRoot.SMALLINT,
+                DataTypeRoot.INTEGER,
+                DataTypeRoot.BIGINT,
+                DataTypeRoot.DATE,
+                DataTypeRoot.FLOAT,
+                DataTypeRoot.DOUBLE,
+                DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
+                DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
+            };
+
+    Object[] allPartitionKeyValues =
+            new Object[] {
+                BinaryString.fromString("a"),
+                BinaryString.fromString("F"),
+                new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0b11111111},
+                true,
+                new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 0b11111111},
+                (byte) 100,
+                (short) -32760, // smallint
+                299000, // Integer
+                1748662955428L, // Bigint
+                20235, // Date
+                5.73f, // Float
+                5.73, // Double
+                5402199, // Time
+                TimestampNtz.fromMillis(1748662955428L), // 
TIME_WITHOUT_TIME_ZONE
+                TimestampLtz.fromEpochMillis(1748662955428L) // 
TIMESTAMP_WITH_LOCAL_TIME_ZONE
+            };
+
+    Schema.Column[] extraColumn =
+            new Schema.Column[] {
+                new Schema.Column("a", new StringType()),
+                new Schema.Column("char", new CharType()),
+                new Schema.Column("binary", new BinaryType()),
+                new Schema.Column("boolean", new BooleanType()),
+                new Schema.Column("bytes", new BytesType()),
+                new Schema.Column("tinyInt", new TinyIntType()),
+                new Schema.Column("smallInt", new SmallIntType()),
+                new Schema.Column("int", new IntType()),
+                new Schema.Column("bigInt", new BigIntType()),
+                new Schema.Column("date", new DateType()),
+                new Schema.Column("float", new FloatType()),
+                new Schema.Column("double", new DoubleType()),
+                new Schema.Column("time", new TimeType()),
+                new Schema.Column("timeStampNTZ", new TimestampType()),
+                new Schema.Column("timeStampLTZ", new 
LocalZonedTimestampType())
+            };
+
+    List<String> result =
+            Arrays.asList(
+                    "a",
+                    "F",
+                    "1020304050ff",
+                    "true",
+                    "1020304050ff",
+                    "100",
+                    "-32760",
+                    "299000",
+                    "1748662955428",
+                    "2025-05-27",
+                    "5_73",
+                    "5_73",
+                    "01-30-02_199",
+                    "2025-05-31-03-42-35_428",
+                    "2025-05-31-03-42-35_428");
+
+    @Test
+    public void testPartitionedTable() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", "person");
+
+        TableDescriptor partitionedTable =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("id", DataTypes.INT())
+                                        .column("name", DataTypes.STRING())
+                                        .column("dt", DataTypes.DATE())
+                                        .build())
+                        .distributedBy(3, "name")
+                        .partitionedBy("id", "dt")
+                        .build();
+        admin.createTable(tablePath, partitionedTable, true).get();
+        TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
+        assertThat(tableInfo).isNotNull();
+        assertThat(tableInfo.getTablePath()).isEqualTo(tablePath);
+
+        List<String> partitionKeys = tableInfo.getPartitionKeys();
+        assertThat(partitionKeys).hasSize(2);
+        assertThat(partitionKeys).containsExactly("id", "dt");
+    }
+
+    @Test
+    public void testMultipleTypedPartitionedTable() throws Exception {
+
+        for (int i = 0; i < allPartitionKeyTypes.length; i++) {
+            String partitionKey = extraColumn[i].getName();
+            TablePath tablePath =
+                    TablePath.of("test_part_db_" + i, 
"test_static_partitioned_pk_table_" + i);
+            createPartitionedTable(tablePath, partitionKey);
+            String partitionValue =
+                    convertValueOfType(allPartitionKeyValues[i], 
allPartitionKeyTypes[i]);
+
+            admin.createPartition(tablePath, newPartitionSpec(partitionKey, 
partitionValue), true)
+                    .get();
+
+            Map<String, Long> partitionIdByNames =
+                    
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath, 1);
+
+            List<PartitionInfo> partitionInfos = 
admin.listPartitionInfos(tablePath).get();
+            List<String> expectedPartitions = new 
ArrayList<>(partitionIdByNames.keySet());
+            assertThat(
+                            partitionInfos.stream()
+                                    .map(PartitionInfo::getPartitionName)
+                                    .collect(Collectors.toList()))
+                    .containsExactlyInAnyOrderElementsOf(expectedPartitions);
+
+            Table table = conn.getTable(tablePath);
+            AppendWriter appendWriter = table.newAppend().createWriter();
+            Map<Long, List<InternalRow>> expectPartitionAppendRows = new 
HashMap<>();
+            for (String partition : partitionIdByNames.keySet()) {
+                for (int j = 0; j < allPartitionKeyValues.length; j++) {
+                    InternalRow row = row(allPartitionKeyValues);
+                    appendWriter.append(row);
+                    expectPartitionAppendRows
+                            .computeIfAbsent(
+                                    partitionIdByNames.get(partition), k -> 
new ArrayList<>())
+                            .add(row);
+                }
+            }
+            appendWriter.flush();
+
+            
assertThat(admin.listPartitionInfos(tablePath).get().get(0).getPartitionName())
+                    .isEqualTo(result.get(i));
+        }
+    }
+
+    private void createPartitionedTable(TablePath tablePath, String 
partitionKey) throws Exception {
+        TableDescriptor partitionTableDescriptor =
+                
TableDescriptor.builder().schema(schema).partitionedBy(partitionKey).build();
+        createTable(tablePath, partitionTableDescriptor, false);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
 
b/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
new file mode 100644
index 000000000..414465003
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/types/PartitionNameConverters.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.types;
+
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+
+import static java.time.temporal.ChronoField.NANO_OF_SECOND;
+
+/**
+ * We replace "." with "_" and replace ":" in date format with "-" so that
+ * TablePath.detectInvalidName validation will accept the partition name.
+ */
+public class PartitionNameConverters {
+
+    private PartitionNameConverters() {}
+
+    public static String hexString(byte[] bytes) {
+        StringBuilder hexString = new StringBuilder();
+        for (byte b : bytes) {
+            String hex = Integer.toHexString(0xFF & b);
+            if (hex.length() == 1) {
+                hexString.append('0');
+            }
+            hexString.append(hex);
+        }
+        return hexString.toString();
+    }
+
+    public static String reformatFloat(Float value) {
+        if (value.isNaN()) {
+            return "NaN";
+        } else if (value.isInfinite()) {
+            return (value > 0) ? "Inf" : "-Inf";
+        } else {
+            return String.valueOf(value).replace(".", "_");
+        }
+    }
+
+    public static String reformatDouble(Double value) {
+        if (value.isNaN()) {
+            return "NaN";
+        } else if (value.isInfinite()) {
+            return (value > 0) ? "Inf" : "-Inf";
+        } else {
+            return String.valueOf(value).replace(".", "_");
+        }
+    }
+
+    private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+
+    public static String dayToString(int days) {
+        OffsetDateTime day = EPOCH.plusDays(days);
+        return String.format(
+                Locale.ROOT,
+                "%04d-%02d-%02d",
+                day.getYear(),
+                day.getMonth().getValue(),
+                day.getDayOfMonth());
+    }
+
+    public static String milliToString(int milli) {
+        int millisPerSecond = 1000;
+        int millisPerMinute = 60 * millisPerSecond;
+        int millisPerHour = 60 * millisPerMinute;
+
+        int hour = Math.floorDiv(milli, millisPerHour);
+        int min = Math.floorDiv(Math.floorMod(milli, millisPerHour), 
millisPerMinute);
+        int seconds =
+                Math.floorDiv(
+                        Math.floorMod(Math.floorMod(milli, millisPerHour), 
millisPerMinute),
+                        millisPerSecond);
+
+        return String.format(
+                Locale.ROOT,
+                "%02d-%02d-%02d_%03d",
+                hour,
+                min,
+                seconds,
+                Math.floorMod(milli, millisPerSecond));
+    }
+
+    private static final DateTimeFormatter TimestampFormatter =
+            new DateTimeFormatterBuilder()
+                    .appendPattern("yyyy-[MM]-[dd]")
+                    .optionalStart()
+                    .appendPattern("-[HH]-[mm]-[ss]_")
+                    .appendFraction(NANO_OF_SECOND, 0, 9, false)
+                    .optionalEnd()
+                    .toFormatter();
+
+    /** always add nanoseconds whether TimestampNTZ and TimestampLTZ are 
compact or not. */
+    public static String timestampToString(TimestampNtz timestampNtz) {
+        return ChronoUnit.MILLIS
+                .addTo(EPOCH, timestampNtz.getMillisecond())
+                .plusNanos(timestampNtz.getNanoOfMillisecond())
+                .toLocalDateTime()
+                .atOffset(ZoneOffset.UTC)
+                .format(TimestampFormatter);
+    }
+
+    public static String timestampToString(TimestampLtz timestampLtz) {
+        return ChronoUnit.MILLIS
+                .addTo(EPOCH, timestampLtz.getEpochMillisecond())
+                .plusNanos(timestampLtz.getNanoOfMillisecond())
+                .toLocalDateTime()
+                .atOffset(ZoneOffset.UTC)
+                .format(TimestampFormatter);
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
index 7db28a812..7ba2197fa 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java
@@ -23,12 +23,16 @@ import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.PartitionNameConverters;
 
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -37,9 +41,23 @@ import static 
org.apache.fluss.metadata.TablePath.detectInvalidName;
 /** Utils for partition. */
 public class PartitionUtils {
 
-    // TODO Support other data types, trace by 
https://github.com/apache/fluss/issues/489
     public static final List<DataTypeRoot> PARTITION_KEY_SUPPORTED_TYPES =
-            Collections.singletonList(DataTypeRoot.STRING);
+            Arrays.asList(
+                    DataTypeRoot.CHAR,
+                    DataTypeRoot.STRING,
+                    DataTypeRoot.BOOLEAN,
+                    DataTypeRoot.BINARY,
+                    DataTypeRoot.BYTES,
+                    DataTypeRoot.TINYINT,
+                    DataTypeRoot.SMALLINT,
+                    DataTypeRoot.INTEGER,
+                    DataTypeRoot.DATE,
+                    DataTypeRoot.TIME_WITHOUT_TIME_ZONE,
+                    DataTypeRoot.BIGINT,
+                    DataTypeRoot.FLOAT,
+                    DataTypeRoot.DOUBLE,
+                    DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                    DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
 
     private static final String YEAR_FORMAT = "yyyy";
     private static final String QUARTER_FORMAT = "yyyyQ";
@@ -134,4 +152,66 @@ public class PartitionUtils {
     private static String getFormattedTime(ZonedDateTime zonedDateTime, String 
format) {
         return DateTimeFormatter.ofPattern(format).format(zonedDateTime);
     }
+
+    public static String convertValueOfType(Object value, DataTypeRoot type) {
+        String stringPartitionKey = "";
+        switch (type) {
+            case CHAR:
+            case STRING:
+                stringPartitionKey = ((BinaryString) value).toString();
+                break;
+            case BOOLEAN:
+                Boolean booleanValue = (Boolean) value;
+                stringPartitionKey = booleanValue.toString();
+                break;
+            case BINARY:
+            case BYTES:
+                byte[] bytesValue = (byte[]) value;
+                stringPartitionKey = 
PartitionNameConverters.hexString(bytesValue);
+                break;
+            case TINYINT:
+                Byte tinyIntValue = (Byte) value;
+                stringPartitionKey = tinyIntValue.toString();
+                break;
+            case SMALLINT:
+                Short smallIntValue = (Short) value;
+                stringPartitionKey = smallIntValue.toString();
+                break;
+            case INTEGER:
+                Integer intValue = (Integer) value;
+                stringPartitionKey = intValue.toString();
+                break;
+            case DATE:
+                Integer dateValue = (Integer) value;
+                stringPartitionKey = 
PartitionNameConverters.dayToString(dateValue);
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                Integer timeValue = (Integer) value;
+                stringPartitionKey = 
PartitionNameConverters.milliToString(timeValue);
+                break;
+            case FLOAT:
+                Float floatValue = (Float) value;
+                stringPartitionKey = 
PartitionNameConverters.reformatFloat(floatValue);
+                break;
+            case DOUBLE:
+                Double doubleValue = (Double) value;
+                stringPartitionKey = 
PartitionNameConverters.reformatDouble(doubleValue);
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                TimestampLtz timeStampLTZValue = (TimestampLtz) value;
+                stringPartitionKey = 
PartitionNameConverters.timestampToString(timeStampLTZValue);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                TimestampNtz timeStampNTZValue = (TimestampNtz) value;
+                stringPartitionKey = 
PartitionNameConverters.timestampToString(timeStampNTZValue);
+                break;
+            case BIGINT:
+                Long bigIntValue = (Long) value;
+                stringPartitionKey = bigIntValue.toString();
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported DataTypeRoot: 
" + type);
+        }
+        return stringPartitionKey;
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java 
b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
index e59c77e85..582fac2e2 100644
--- a/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/utils/PartitionUtilsTest.java
@@ -24,6 +24,10 @@ import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypeRoot;
 
 import org.junit.jupiter.api.Test;
 
@@ -33,8 +37,10 @@ import java.time.ZonedDateTime;
 import java.util.Arrays;
 import java.util.Collections;
 
+import static org.apache.fluss.metadata.TablePath.detectInvalidName;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.utils.PartitionUtils.convertValueOfType;
 import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition;
 import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec;
 import static org.apache.fluss.utils.PartitionUtils.validatePartitionValues;
@@ -148,4 +154,173 @@ class PartitionUtilsTest {
             
assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo(expected[i]);
         }
     }
+
+    @Test
+    void testString() {
+        Object value = BinaryString.fromString("Fluss");
+        DataTypeRoot type = DataTypeRoot.STRING;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("Fluss");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testCharConvert() {
+        Object value = BinaryString.fromString("F");
+        DataTypeRoot type = DataTypeRoot.CHAR;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("F");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testBooleanConvert() {
+        Object value = true;
+        DataTypeRoot type = DataTypeRoot.BOOLEAN;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("true");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testByteConvert() {
+        Object value = new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 
0b11111111};
+        DataTypeRoot type = DataTypeRoot.BYTES;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("1020304050ff");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testBinaryConvert() {
+        Object value = new byte[] {0x10, 0x20, 0x30, 0x40, 0x50, (byte) 
0b11111111};
+        DataTypeRoot type = DataTypeRoot.BINARY;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("1020304050ff");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testTinyInt() {
+        Object value = (byte) 100;
+        DataTypeRoot type = DataTypeRoot.TINYINT;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("100");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testSmallInt() {
+        Object value = (short) -32760;
+        DataTypeRoot type = DataTypeRoot.SMALLINT;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("-32760");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testInt() {
+        Object value = 299000;
+        DataTypeRoot type = DataTypeRoot.INTEGER;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("299000");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testDate() {
+        Object value = 20235;
+        DataTypeRoot type = DataTypeRoot.DATE;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("2025-05-27");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testTimeNTZ() {
+        int value = 5402199;
+        DataTypeRoot type = DataTypeRoot.TIME_WITHOUT_TIME_ZONE;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("01-30-02_199");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testFloat() {
+        Object value = 5.73f;
+        DataTypeRoot type = DataTypeRoot.FLOAT;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("5_73");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testDouble() {
+        Object value = 5.73;
+        DataTypeRoot type = DataTypeRoot.DOUBLE;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("5_73");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testTimestampNTZ() {
+        long millis = 1748662955428L;
+        int nanos = 99988;
+        TimestampNtz timeStampNTZValue = TimestampNtz.fromMillis(millis, 
nanos);
+        DataTypeRoot type = DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+
+        String toStringResult = convertValueOfType(timeStampNTZValue, type);
+        assertThat(toStringResult).isEqualTo("2025-05-31-03-42-35_428099988");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testTimestampLTZ() {
+        long millis = 1748662955428L;
+        int nanos = 99988;
+        TimestampLtz timestampLTZ = TimestampLtz.fromEpochMillis(millis, 
nanos);
+        DataTypeRoot type = DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+
+        String toStringResult = convertValueOfType(timestampLTZ, type);
+        assertThat(toStringResult).isEqualTo("2025-05-31-03-42-35_428099988");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
+
+    @Test
+    void testBigInt() {
+        long value = 1748662955428L;
+        DataTypeRoot type = DataTypeRoot.BIGINT;
+
+        String toStringResult = convertValueOfType(value, type);
+        assertThat(toStringResult).isEqualTo("1748662955428");
+        String detectInvalid = detectInvalidName(toStringResult);
+        assertThat(detectInvalid).isEqualTo(null);
+    }
 }
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
index b8cbd2512..bda096dfe 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
@@ -73,7 +73,7 @@ public interface AdminGateway extends AdminReadOnlyGateway {
     CompletableFuture<DropTableResponse> dropTable(DropTableRequest request);
 
     /**
-     * create a new partition for a partitioned table.
+     * Create a new partition for a partitioned table.
      *
      * @param request Create partition request
      */
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 253774854..93352b20c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -418,30 +418,6 @@ class TableManagerITCase {
         assertThat(zkClient.getPartitions(tablePath)).isEmpty();
     }
 
-    @Test
-    void testCreateInvalidPartitionedTable() throws Exception {
-        AdminGateway adminGateway = getAdminGateway();
-        String db1 = "db1";
-        String tb1 = "tb1";
-        TablePath tablePath = TablePath.of(db1, tb1);
-        // first create a database
-        adminGateway.createDatabase(newCreateDatabaseRequest(db1, 
false)).get();
-
-        TableDescriptor tableWithIntPartKey =
-                newPartitionedTableBuilder(null).partitionedBy("id").build();
-        assertThatThrownBy(
-                        () ->
-                                adminGateway
-                                        .createTable(
-                                                newCreateTableRequest(
-                                                        tablePath, 
tableWithIntPartKey, false))
-                                        .get())
-                .cause()
-                .isInstanceOf(InvalidTableException.class)
-                .hasMessageContaining(
-                        "Currently, partitioned table supported partition key 
type are [STRING], but got partition key 'id' with data type INT NOT NULL.");
-    }
-
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testMetadata(boolean isCoordinatorServer) throws Exception {
diff --git a/website/docs/table-design/data-distribution/partitioning.md 
b/website/docs/table-design/data-distribution/partitioning.md
index 397b2092c..ffeef84f6 100644
--- a/website/docs/table-design/data-distribution/partitioning.md
+++ b/website/docs/table-design/data-distribution/partitioning.md
@@ -27,7 +27,7 @@ For example, in an `Order` primary key table, the partition 
key can be defined a
 - **Scalability:** Partitioning large datasets distributes the data across 
smaller, manageable chunks, improving scalability.
 
 ## Restrictions
-- The type of the partition keys must be STRING.
+- The partition key must be a Fluss native data type and cannot be contained 
in a map or list.
 - For auto partition table, the partition keys can be one or more. If the 
table has only one partition key, it supports automatic creation and automatic 
expiration of partitions. Otherwise, only automatic expiration is allowed.
 - If the table is a primary key table, the partition key must be a subset of 
the primary key.
 - Auto-partitioning rules can only be configured at the time of creating the 
partitioned table; modifying the auto-partitioning rules after table creation 
is not supported.


Reply via email to