This is an automated email from the ASF dual-hosted git repository.

chengwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 872554b32 [kv/auto increment column] constrain upsert behavior for 
auto increment column (#2117)
872554b32 is described below

commit 872554b325c017b1bd21e90c69199650d796c1b9
Author: xx789 <[email protected]>
AuthorDate: Wed Dec 10 19:46:46 2025 +0800

    [kv/auto increment column] constrain upsert behavior for auto increment 
column (#2117)
    
    * constrain upsert behavior for auto increment column
    
    * auto increment column should not be nullable
---
 .../client/table/writer/UpsertWriterImpl.java      | 36 +++++++++++++++++++---
 .../fluss/client/table/FlussTableITCase.java       | 23 ++++++++++++++
 .../java/org/apache/fluss/metadata/Schema.java     |  6 ++--
 .../fluss/utils/json/SchemaJsonSerdeTest.java      |  2 +-
 4 files changed, 60 insertions(+), 7 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index 904b85c2a..39f65592c 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -61,7 +61,11 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
             WriterClient writerClient) {
         super(tablePath, tableInfo, writerClient);
         RowType rowType = tableInfo.getRowType();
-        sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
+        sanityCheck(
+                rowType,
+                tableInfo.getPrimaryKeys(),
+                tableInfo.getSchema().getAutoIncrementColumnNames(),
+                partialUpdateColumns);
 
         this.targetColumns = partialUpdateColumns;
         DataLakeFormat lakeFormat = 
tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
@@ -80,9 +84,20 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
     }
 
     private static void sanityCheck(
-            RowType rowType, List<String> primaryKeys, @Nullable int[] 
targetColumns) {
+            RowType rowType,
+            List<String> primaryKeys,
+            List<String> autoIncrementColumnNames,
+            @Nullable int[] targetColumns) {
         // skip check when target columns is null
         if (targetColumns == null) {
+            if (!autoIncrementColumnNames.isEmpty()) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "This table has auto increment column %s. "
+                                        + "Explicitly specifying values for an 
auto increment column is not allowed. "
+                                        + "Please specify non-auto-increment 
columns as target columns using partialUpdate first.",
+                                autoIncrementColumnNames));
+            }
             return;
         }
         BitSet targetColumnsSet = new BitSet();
@@ -103,10 +118,23 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
             pkColumnSet.set(pkIndex);
         }
 
+        BitSet autoIncrementColumnSet = new BitSet();
+        // explicitly specifying values for an auto increment column is not 
allowed
+        for (String autoIncrementColumnName : autoIncrementColumnNames) {
+            int autoIncrementColumnIndex = 
rowType.getFieldIndex(autoIncrementColumnName);
+            if (targetColumnsSet.get(autoIncrementColumnIndex)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Explicitly specifying values for the auto 
increment column %s is not allowed.",
+                                autoIncrementColumnName));
+            }
+            autoIncrementColumnSet.set(autoIncrementColumnIndex);
+        }
+
         // check the columns not in targetColumns should be nullable
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-            // column not in primary key
-            if (!pkColumnSet.get(i)) {
+            // column not in primary key and not in auto increment column
+            if (!pkColumnSet.get(i) && !autoIncrementColumnSet.get(i)) {
                 // the column should be nullable
                 if (!rowType.getTypeAt(i).isNullable()) {
                     throw new IllegalArgumentException(
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index cf2084675..bfd9926df 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -662,6 +662,29 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                     .hasMessage(
                             "Invalid target column index: 3 for table 
test_db_1.test_pk_table_1. The table only has 3 columns.");
         }
+
+        // test invalid auto increment column upsert
+        schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .primaryKey("a")
+                        .enableAutoIncrement("c")
+                        .build();
+        tableDescriptor = 
TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
+        TablePath tablePath =
+                TablePath.of("test_db_1", 
"test_invalid_auto_increment_column_upsert");
+        createTable(tablePath, tableDescriptor, true);
+        try (Table table = conn.getTable(tablePath)) {
+            assertThatThrownBy(() -> table.newUpsert().createWriter())
+                    .hasMessage(
+                            "This table has auto increment column [c]. 
Explicitly specifying values for an auto increment column is not allowed. 
Please specify non-auto-increment columns as target columns using partialUpdate 
first.");
+
+            assertThatThrownBy(() -> table.newUpsert().partialUpdate("a", 
"c").createWriter())
+                    .hasMessage(
+                            "Explicitly specifying values for the auto 
increment column c is not allowed.");
+        }
     }
 
     @Test
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index cf7734ace..715fabf7c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -607,8 +607,10 @@ public final class Schema implements Serializable {
                         "The data type of auto increment column must be INT or 
BIGINT.");
             }
 
-            // primary key should not nullable
-            if (pkSet.contains(column.getName()) && 
column.getDataType().isNullable()) {
+            // primary key and auto increment column should not nullable
+            if ((pkSet.contains(column.getName())
+                            || 
autoIncrementColumnNames.contains(column.getName()))
+                    && column.getDataType().isNullable()) {
                 newColumns.add(
                         new Column(
                                 column.getName(),
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index ee7008dbd..33acb3083 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -87,7 +87,7 @@ public class SchemaJsonSerdeTest extends 
JsonSerdeTestBase<Schema> {
             
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c
 is third column\",\"id\":2}],\"highest_field_id\":2}";
 
     static final String SCHEMA_JSON_4 =
-            
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
+            
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
 is first 
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b
 is second 
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
 is third 
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
 
     SchemaJsonSerdeTest() {
         super(SchemaJsonSerde.INSTANCE);

Reply via email to