This is an automated email from the ASF dual-hosted git repository.
chengwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 872554b32 [kv/auto increment column] constrain upsert behavior for
auto increment column (#2117)
872554b32 is described below
commit 872554b325c017b1bd21e90c69199650d796c1b9
Author: xx789 <[email protected]>
AuthorDate: Wed Dec 10 19:46:46 2025 +0800
[kv/auto increment column] constrain upsert behavior for auto increment
column (#2117)
* constrain upsert behavior for auto increment column
* auto increment column should not be nullable
---
.../client/table/writer/UpsertWriterImpl.java | 36 +++++++++++++++++++---
.../fluss/client/table/FlussTableITCase.java | 23 ++++++++++++++
.../java/org/apache/fluss/metadata/Schema.java | 6 ++--
.../fluss/utils/json/SchemaJsonSerdeTest.java | 2 +-
4 files changed, 60 insertions(+), 7 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index 904b85c2a..39f65592c 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -61,7 +61,11 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
WriterClient writerClient) {
super(tablePath, tableInfo, writerClient);
RowType rowType = tableInfo.getRowType();
- sanityCheck(rowType, tableInfo.getPrimaryKeys(), partialUpdateColumns);
+ sanityCheck(
+ rowType,
+ tableInfo.getPrimaryKeys(),
+ tableInfo.getSchema().getAutoIncrementColumnNames(),
+ partialUpdateColumns);
this.targetColumns = partialUpdateColumns;
DataLakeFormat lakeFormat =
tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
@@ -80,9 +84,20 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
}
private static void sanityCheck(
- RowType rowType, List<String> primaryKeys, @Nullable int[]
targetColumns) {
+ RowType rowType,
+ List<String> primaryKeys,
+ List<String> autoIncrementColumnNames,
+ @Nullable int[] targetColumns) {
// skip check when target columns is null
if (targetColumns == null) {
+ if (!autoIncrementColumnNames.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "This table has auto increment column %s. "
+ + "Explicitly specifying values for an
auto increment column is not allowed. "
+ + "Please specify non-auto-increment
columns as target columns using partialUpdate first.",
+ autoIncrementColumnNames));
+ }
return;
}
BitSet targetColumnsSet = new BitSet();
@@ -103,10 +118,23 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
pkColumnSet.set(pkIndex);
}
+ BitSet autoIncrementColumnSet = new BitSet();
+ // explicitly specifying values for an auto increment column is not
allowed
+ for (String autoIncrementColumnName : autoIncrementColumnNames) {
+ int autoIncrementColumnIndex =
rowType.getFieldIndex(autoIncrementColumnName);
+ if (targetColumnsSet.get(autoIncrementColumnIndex)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Explicitly specifying values for the auto
increment column %s is not allowed.",
+ autoIncrementColumnName));
+ }
+ autoIncrementColumnSet.set(autoIncrementColumnIndex);
+ }
+
// check the columns not in targetColumns should be nullable
for (int i = 0; i < rowType.getFieldCount(); i++) {
- // column not in primary key
- if (!pkColumnSet.get(i)) {
+ // column not in primary key and not in auto increment column
+ if (!pkColumnSet.get(i) && !autoIncrementColumnSet.get(i)) {
// the column should be nullable
if (!rowType.getTypeAt(i).isNullable()) {
throw new IllegalArgumentException(
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index cf2084675..bfd9926df 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -662,6 +662,29 @@ class FlussTableITCase extends ClientToServerITCaseBase {
.hasMessage(
"Invalid target column index: 3 for table
test_db_1.test_pk_table_1. The table only has 3 columns.");
}
+
+ // test invalid auto increment column upsert
+ schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .primaryKey("a")
+ .enableAutoIncrement("c")
+ .build();
+ tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(3, "a").build();
+ TablePath tablePath =
+ TablePath.of("test_db_1",
"test_invalid_auto_increment_column_upsert");
+ createTable(tablePath, tableDescriptor, true);
+ try (Table table = conn.getTable(tablePath)) {
+ assertThatThrownBy(() -> table.newUpsert().createWriter())
+ .hasMessage(
+ "This table has auto increment column [c].
Explicitly specifying values for an auto increment column is not allowed.
Please specify non-auto-increment columns as target columns using partialUpdate
first.");
+
+ assertThatThrownBy(() -> table.newUpsert().partialUpdate("a",
"c").createWriter())
+ .hasMessage(
+ "Explicitly specifying values for the auto
increment column c is not allowed.");
+ }
}
@Test
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index cf7734ace..715fabf7c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -607,8 +607,10 @@ public final class Schema implements Serializable {
"The data type of auto increment column must be INT or
BIGINT.");
}
- // primary key should not nullable
- if (pkSet.contains(column.getName()) &&
column.getDataType().isNullable()) {
+ // primary key and auto increment column should not nullable
+ if ((pkSet.contains(column.getName())
+ ||
autoIncrementColumnNames.contains(column.getName()))
+ && column.getDataType().isNullable()) {
newColumns.add(
new Column(
column.getName(),
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index ee7008dbd..33acb3083 100644
---
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -87,7 +87,7 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c
is third column\",\"id\":2}],\"highest_field_id\":2}";
static final String SCHEMA_JSON_4 =
-
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
is third
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
+
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
is third
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
SchemaJsonSerdeTest() {
super(SchemaJsonSerde.INSTANCE);