This is an automated email from the ASF dual-hosted git repository.

chengwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 453d64b40 [kv/auto increment column] add auto increment column in 
Fluss schema (#1887)
453d64b40 is described below

commit 453d64b401434062fe1313f91205f328333722b4
Author: xx789 <[email protected]>
AuthorDate: Sun Dec 7 16:38:33 2025 +0800

    [kv/auto increment column] add auto increment column in Fluss schema (#1887)
    
    * add auto increment column in Fluss schema
    
    * support auto increment column in Flink SQL syntax
    
    ---------
    
    Co-authored-by: Jark Wu <[email protected]>
---
 .../org/apache/fluss/config/ConfigOptions.java     |  11 ++
 .../java/org/apache/fluss/metadata/Schema.java     |  62 ++++++++++-
 .../apache/fluss/utils/json/SchemaJsonSerde.java   |  17 +++
 .../org/apache/fluss/metadata/TableSchemaTest.java | 124 +++++++++++++++++++++
 .../fluss/utils/json/SchemaJsonSerdeTest.java      |  21 +++-
 .../apache/fluss/flink/utils/FlinkConversions.java |  41 ++++---
 6 files changed, 252 insertions(+), 24 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index bcfeb061b..38c36c4c4 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1412,6 +1412,17 @@ public class ConfigOptions {
                                     + "The `disable` behavior rejects delete 
requests with a clear error message. "
                                     + "For tables with FIRST_ROW or VERSIONED 
merge engines, this option defaults to `ignore`.");
 
+    public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS =
+            key("table.auto-increment.fields")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the auto increment columns. "
+                                    + "The auto increment column can only be 
used in primary-key table."
+                                    + "With an auto increment column in the 
table, whenever a new row is inserted into the table, the new row will be 
assigned with the next available value from the auto-increment sequence."
+                                    + "The auto increment column can only be 
used in primary-key table. The data type of the auto increment column must be 
INT or BIGINT."
+                                    + "Currently a table can have only one 
auto-increment column.");
+
     // ------------------------------------------------------------------------
     //  ConfigOptions for Kv
     // ------------------------------------------------------------------------
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 0ae0c8b97..cf7734ace 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.annotation.PublicStable;
 import org.apache.fluss.types.DataField;
 import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.EncodingUtils;
 import org.apache.fluss.utils.StringUtils;
@@ -60,6 +61,7 @@ public final class Schema implements Serializable {
 
     private final List<Column> columns;
     private final @Nullable PrimaryKey primaryKey;
+    private final List<String> autoIncrementColumnNames;
     private final RowType rowType;
 
     /**
@@ -68,9 +70,14 @@ public final class Schema implements Serializable {
      */
     private final int highestFieldId;
 
-    private Schema(List<Column> columns, @Nullable PrimaryKey primaryKey, int 
highestFieldId) {
-        this.columns = normalizeColumns(columns, primaryKey);
+    private Schema(
+            List<Column> columns,
+            @Nullable PrimaryKey primaryKey,
+            int highestFieldId,
+            List<String> autoIncrementColumnNames) {
+        this.columns = normalizeColumns(columns, primaryKey, 
autoIncrementColumnNames);
         this.primaryKey = primaryKey;
+        this.autoIncrementColumnNames = autoIncrementColumnNames;
         // pre-create the row type as it is the most frequently used part of 
the schema
         this.rowType =
                 new RowType(
@@ -91,6 +98,10 @@ public final class Schema implements Serializable {
         return Optional.ofNullable(primaryKey);
     }
 
+    public List<String> getAutoIncrementColumnNames() {
+        return autoIncrementColumnNames;
+    }
+
     public RowType getRowType() {
         return rowType;
     }
@@ -205,10 +216,12 @@ public final class Schema implements Serializable {
     public static final class Builder {
         private final List<Column> columns;
         private @Nullable PrimaryKey primaryKey;
+        private final List<String> autoIncrementColumnNames;
         private AtomicInteger highestFieldId;
 
         private Builder() {
             columns = new ArrayList<>();
+            autoIncrementColumnNames = new ArrayList<>();
             highestFieldId = new AtomicInteger(-1);
         }
 
@@ -360,6 +373,23 @@ public final class Schema implements Serializable {
             return this;
         }
 
+        /**
+         * Declares a column to be auto-incremented. With an auto-increment 
column in the table,
+         * whenever a new row is inserted into the table, the new row will be 
assigned with the next
+         * available value from the auto-increment sequence. A table can have 
at most one auto
+         * increment column.
+         *
+         * @param columnName the auto increment column name
+         */
+        public Builder enableAutoIncrement(String columnName) {
+            checkState(
+                    autoIncrementColumnNames.isEmpty(),
+                    "Multiple auto increment columns are not supported yet.");
+            checkArgument(columnName != null, "Auto increment column name must 
not be null.");
+            autoIncrementColumnNames.add(columnName);
+            return this;
+        }
+
         /** Returns an instance of an {@link Schema}. */
         public Schema build() {
             Integer maximumColumnId =
@@ -372,7 +402,7 @@ public final class Schema implements Serializable {
             checkState(
                     
columns.stream().map(Column::getColumnId).distinct().count() == columns.size(),
                     "Column ids must be unique.");
-            return new Schema(columns, primaryKey, highestFieldId.get());
+            return new Schema(columns, primaryKey, highestFieldId.get(), 
autoIncrementColumnNames);
         }
     }
 
@@ -522,7 +552,9 @@ public final class Schema implements Serializable {
 
     /** Normalize columns and primary key. */
     private static List<Column> normalizeColumns(
-            List<Column> columns, @Nullable PrimaryKey primaryKey) {
+            List<Column> columns,
+            @Nullable PrimaryKey primaryKey,
+            List<String> autoIncrementColumnNames) {
 
         List<String> columnNames =
                 
columns.stream().map(Column::getName).collect(Collectors.toList());
@@ -536,6 +568,9 @@ public final class Schema implements Serializable {
         Set<String> allFields = new HashSet<>(columnNames);
 
         if (primaryKey == null) {
+            checkState(
+                    autoIncrementColumnNames.isEmpty(),
+                    "Auto increment column can only be used in primary-key 
table.");
             return Collections.unmodifiableList(columns);
         }
 
@@ -552,10 +587,27 @@ public final class Schema implements Serializable {
                 columnNames,
                 primaryKeyNames);
 
-        // primary key should not nullable
         Set<String> pkSet = new HashSet<>(primaryKeyNames);
+        for (String autoIncrementColumn : autoIncrementColumnNames) {
+            checkState(
+                    allFields.contains(autoIncrementColumn),
+                    "Auto increment column %s does not exist in table columns 
%s.",
+                    autoIncrementColumn,
+                    columnNames);
+            checkState(
+                    !pkSet.contains(autoIncrementColumn),
+                    "Auto increment column can not be used as the primary 
key.");
+        }
         List<Column> newColumns = new ArrayList<>();
         for (Column column : columns) {
+            if (autoIncrementColumnNames.contains(column.getName())) {
+                checkState(
+                        column.getDataType().is(DataTypeRoot.INTEGER)
+                                || 
column.getDataType().is(DataTypeRoot.BIGINT),
+                        "The data type of auto increment column must be INT or 
BIGINT.");
+            }
+
+            // primary key should not nullable
             if (pkSet.contains(column.getName()) && 
column.getDataType().isNullable()) {
                 newColumns.add(
                         new Column(
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
index 527247b6e..6fd1ddb75 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
@@ -36,6 +36,7 @@ public class SchemaJsonSerde implements 
JsonSerializer<Schema>, JsonDeserializer
 
     private static final String COLUMNS_NAME = "columns";
     private static final String PRIMARY_KEY_NAME = "primary_key";
+    private static final String AUTO_INCREMENT_COLUMN_NAME = 
"auto_increment_column";
     private static final String VERSION_KEY = "version";
     private static final String HIGHEST_FIELD_ID = "highest_field_id";
     private static final int VERSION = 1;
@@ -62,6 +63,14 @@ public class SchemaJsonSerde implements 
JsonSerializer<Schema>, JsonDeserializer
             }
             generator.writeEndArray();
         }
+        List<String> autoIncrementColumnNames = 
schema.getAutoIncrementColumnNames();
+        if (!autoIncrementColumnNames.isEmpty()) {
+            generator.writeArrayFieldStart(AUTO_INCREMENT_COLUMN_NAME);
+            for (String columnName : autoIncrementColumnNames) {
+                generator.writeString(columnName);
+            }
+            generator.writeEndArray();
+        }
 
         generator.writeNumberField(HIGHEST_FIELD_ID, 
schema.getHighestFieldId());
 
@@ -86,6 +95,14 @@ public class SchemaJsonSerde implements 
JsonSerializer<Schema>, JsonDeserializer
             builder.primaryKey(primaryKeys);
         }
 
+        if (node.has(AUTO_INCREMENT_COLUMN_NAME)) {
+            Iterator<JsonNode> autoIncrementColumnJsons =
+                    node.get(AUTO_INCREMENT_COLUMN_NAME).elements();
+            while (autoIncrementColumnJsons.hasNext()) {
+                
builder.enableAutoIncrement(autoIncrementColumnJsons.next().asText());
+            }
+        }
+
         if (node.has(HIGHEST_FIELD_ID)) {
             builder.highestFieldId(node.get(HIGHEST_FIELD_ID).asInt());
         }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java 
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
new file mode 100644
index 000000000..9f35dd602
--- /dev/null
+++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link org.apache.fluss.metadata.Schema}. */
+class TableSchemaTest {
+
+    @Test
+    void testAutoIncrementColumnSchema() {
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("f1", DataTypes.BIGINT())
+                                        .column("f3", DataTypes.STRING())
+                                        .primaryKey("f0")
+                                        .primaryKey("f0")
+                                        .build())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Multiple primary keys are not supported.");
+
+        assertThat(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.STRING())
+                                .column("f1", DataTypes.BIGINT())
+                                .column("f3", DataTypes.STRING())
+                                .enableAutoIncrement("f1")
+                                .primaryKey("f0")
+                                .build()
+                                .getAutoIncrementColumnNames())
+                .isEqualTo(Collections.singletonList("f1"));
+        assertThat(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.STRING())
+                                .column("f1", DataTypes.BIGINT())
+                                .column("f3", DataTypes.STRING())
+                                .primaryKey("f0")
+                                .build()
+                                .getAutoIncrementColumnNames())
+                .isEmpty();
+
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("f1", DataTypes.BIGINT())
+                                        .column("f3", DataTypes.STRING())
+                                        .enableAutoIncrement("f0")
+                                        .primaryKey("f0")
+                                        .build())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Auto increment column can not be used as the 
primary key.");
+
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("f1", DataTypes.BIGINT())
+                                        .column("f3", DataTypes.STRING())
+                                        .enableAutoIncrement("f1")
+                                        .enableAutoIncrement("f1")
+                                        .primaryKey("f0")
+                                        .build())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Multiple auto increment columns are not supported 
yet.");
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("f1", DataTypes.BIGINT())
+                                        .column("f3", DataTypes.STRING())
+                                        .enableAutoIncrement("f3")
+                                        .primaryKey("f0")
+                                        .build())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("The data type of auto increment column must be 
INT or BIGINT.");
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("f1", DataTypes.BIGINT())
+                                        .column("f3", DataTypes.STRING())
+                                        .enableAutoIncrement("f4")
+                                        .primaryKey("f0")
+                                        .build())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Auto increment column f4 does not exist in table 
columns [f0, f1, f3].");
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.STRING())
+                                        .column("f1", DataTypes.BIGINT())
+                                        .column("f3", DataTypes.STRING())
+                                        .enableAutoIncrement("f1")
+                                        .build())
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Auto increment column can only be used in 
primary-key table.");
+    }
+}
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index a7b6dca62..ee7008dbd 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -66,6 +66,18 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
                     .withComment("c is third column")
                     .build();
 
+    static final Schema SCHEMA_4 =
+            Schema.newBuilder()
+                    .column("a", DataTypes.INT())
+                    .withComment("a is first column")
+                    .column("b", DataTypes.INT())
+                    .withComment("b is second column")
+                    .column("c", DataTypes.CHAR(10))
+                    .withComment("c is third column")
+                    .primaryKey("a", "c")
+                    .enableAutoIncrement("b")
+                    .build();
+
     static final String SCHEMA_JSON_0 =
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"highest_field_id\":2}";
     static final String SCHEMA_JSON_1 =
@@ -74,18 +86,23 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
     static final String SCHEMA_JSON_3 =
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c
 is third column\",\"id\":2}],\"highest_field_id\":2}";
 
+    static final String SCHEMA_JSON_4 =
+            
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
+
     SchemaJsonSerdeTest() {
         super(SchemaJsonSerde.INSTANCE);
     }
 
     @Override
     protected Schema[] createObjects() {
-        return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3};
+        return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4};
     }
 
     @Override
     protected String[] expectedJsons() {
-        return new String[] {SCHEMA_JSON_0, SCHEMA_JSON_1, SCHEMA_JSON_1, 
SCHEMA_JSON_3};
+        return new String[] {
+            SCHEMA_JSON_0, SCHEMA_JSON_1, SCHEMA_JSON_1, SCHEMA_JSON_3, 
SCHEMA_JSON_4
+        };
     }
 
     @Test
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 2553b0c18..f8a05573c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -66,6 +66,7 @@ import static 
org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
 import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
+import static 
org.apache.fluss.config.ConfigOptions.TABLE_AUTO_INCREMENT_FIELDS;
 import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
@@ -196,20 +197,29 @@ public class FlinkConversions {
         }
 
         // first build schema with physical columns
-        Schema schema =
-                schemBuilder
-                        .fromColumns(
-                                resolvedSchema.getColumns().stream()
-                                        .filter(Column::isPhysical)
-                                        .map(
-                                                column ->
-                                                        new Schema.Column(
-                                                                
column.getName(),
-                                                                
FlinkConversions.toFlussType(
-                                                                        
column.getDataType()),
-                                                                
column.getComment().orElse(null)))
-                                        .collect(Collectors.toList()))
-                        .build();
+        schemBuilder.fromColumns(
+                resolvedSchema.getColumns().stream()
+                        .filter(Column::isPhysical)
+                        .map(
+                                column ->
+                                        new Schema.Column(
+                                                column.getName(),
+                                                
FlinkConversions.toFlussType(column.getDataType()),
+                                                
column.getComment().orElse(null)))
+                        .collect(Collectors.toList()));
+
+        // convert some flink options to fluss table configs.
+        Map<String, String> properties = 
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
+
+        if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
+            for (String autoIncrementColumn :
+                    
properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
+                schemBuilder.enableAutoIncrement(autoIncrementColumn);
+            }
+        }
+
+        Schema schema = schemBuilder.build();
+
         resolvedSchema.getColumns().stream()
                 .filter(col -> col instanceof Column.MetadataColumn)
                 .findAny()
@@ -245,9 +255,6 @@ public class FlinkConversions {
 
         String comment = catalogBaseTable.getComment();
 
-        // convert some flink options to fluss table configs.
-        Map<String, String> properties = 
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
-
         // then set distributed by information
         List<String> bucketKey;
         if (flinkTableConf.containsKey(BUCKET_KEY.key())) {

Reply via email to