This is an automated email from the ASF dual-hosted git repository.
chengwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 453d64b40 [kv/auto increment column] add auto increment column in
Fluss schema (#1887)
453d64b40 is described below
commit 453d64b401434062fe1313f91205f328333722b4
Author: xx789 <[email protected]>
AuthorDate: Sun Dec 7 16:38:33 2025 +0800
[kv/auto increment column] add auto increment column in Fluss schema (#1887)
* add auto increment column in Fluss schema
* support auto increment column in Flink SQL syntax
---------
Co-authored-by: Jark Wu <[email protected]>
---
.../org/apache/fluss/config/ConfigOptions.java | 11 ++
.../java/org/apache/fluss/metadata/Schema.java | 62 ++++++++++-
.../apache/fluss/utils/json/SchemaJsonSerde.java | 17 +++
.../org/apache/fluss/metadata/TableSchemaTest.java | 124 +++++++++++++++++++++
.../fluss/utils/json/SchemaJsonSerdeTest.java | 21 +++-
.../apache/fluss/flink/utils/FlinkConversions.java | 41 ++++---
6 files changed, 252 insertions(+), 24 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index bcfeb061b..38c36c4c4 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1412,6 +1412,17 @@ public class ConfigOptions {
+ "The `disable` behavior rejects delete
requests with a clear error message. "
+ "For tables with FIRST_ROW or VERSIONED
merge engines, this option defaults to `ignore`.");
+ public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS =
+ key("table.auto-increment.fields")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines the auto increment columns. "
+ + "The auto increment column can only be
used in primary-key table."
+ + "With an auto increment column in the
table, whenever a new row is inserted into the table, the new row will be
assigned with the next available value from the auto-increment sequence."
+ + "The auto increment column can only be
used in primary-key table. The data type of the auto increment column must be
INT or BIGINT."
+ + "Currently a table can have only one
auto-increment column.");
+
// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
index 0ae0c8b97..cf7734ace 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.annotation.PublicStable;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypeRoot;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.EncodingUtils;
import org.apache.fluss.utils.StringUtils;
@@ -60,6 +61,7 @@ public final class Schema implements Serializable {
private final List<Column> columns;
private final @Nullable PrimaryKey primaryKey;
+ private final List<String> autoIncrementColumnNames;
private final RowType rowType;
/**
@@ -68,9 +70,14 @@ public final class Schema implements Serializable {
*/
private final int highestFieldId;
- private Schema(List<Column> columns, @Nullable PrimaryKey primaryKey, int
highestFieldId) {
- this.columns = normalizeColumns(columns, primaryKey);
+ private Schema(
+ List<Column> columns,
+ @Nullable PrimaryKey primaryKey,
+ int highestFieldId,
+ List<String> autoIncrementColumnNames) {
+ this.columns = normalizeColumns(columns, primaryKey,
autoIncrementColumnNames);
this.primaryKey = primaryKey;
+ this.autoIncrementColumnNames = autoIncrementColumnNames;
// pre-create the row type as it is the most frequently used part of
the schema
this.rowType =
new RowType(
@@ -91,6 +98,10 @@ public final class Schema implements Serializable {
return Optional.ofNullable(primaryKey);
}
+ public List<String> getAutoIncrementColumnNames() {
+ return autoIncrementColumnNames;
+ }
+
public RowType getRowType() {
return rowType;
}
@@ -205,10 +216,12 @@ public final class Schema implements Serializable {
public static final class Builder {
private final List<Column> columns;
private @Nullable PrimaryKey primaryKey;
+ private final List<String> autoIncrementColumnNames;
private AtomicInteger highestFieldId;
private Builder() {
columns = new ArrayList<>();
+ autoIncrementColumnNames = new ArrayList<>();
highestFieldId = new AtomicInteger(-1);
}
@@ -360,6 +373,23 @@ public final class Schema implements Serializable {
return this;
}
+ /**
+ * Declares a column to be auto-incremented. With an auto-increment
column in the table,
+ * whenever a new row is inserted into the table, the new row will be
assigned with the next
+ * available value from the auto-increment sequence. A table can have
at most one auto
+ * increment column.
+ *
+ * @param columnName the auto increment column name
+ */
+ public Builder enableAutoIncrement(String columnName) {
+ checkState(
+ autoIncrementColumnNames.isEmpty(),
+ "Multiple auto increment columns are not supported yet.");
+ checkArgument(columnName != null, "Auto increment column name must
not be null.");
+ autoIncrementColumnNames.add(columnName);
+ return this;
+ }
+
/** Returns an instance of an {@link Schema}. */
public Schema build() {
Integer maximumColumnId =
@@ -372,7 +402,7 @@ public final class Schema implements Serializable {
checkState(
columns.stream().map(Column::getColumnId).distinct().count() == columns.size(),
"Column ids must be unique.");
- return new Schema(columns, primaryKey, highestFieldId.get());
+ return new Schema(columns, primaryKey, highestFieldId.get(),
autoIncrementColumnNames);
}
}
@@ -522,7 +552,9 @@ public final class Schema implements Serializable {
/** Normalize columns and primary key. */
private static List<Column> normalizeColumns(
- List<Column> columns, @Nullable PrimaryKey primaryKey) {
+ List<Column> columns,
+ @Nullable PrimaryKey primaryKey,
+ List<String> autoIncrementColumnNames) {
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
@@ -536,6 +568,9 @@ public final class Schema implements Serializable {
Set<String> allFields = new HashSet<>(columnNames);
if (primaryKey == null) {
+ checkState(
+ autoIncrementColumnNames.isEmpty(),
+ "Auto increment column can only be used in primary-key
table.");
return Collections.unmodifiableList(columns);
}
@@ -552,10 +587,27 @@ public final class Schema implements Serializable {
columnNames,
primaryKeyNames);
- // primary key should not nullable
Set<String> pkSet = new HashSet<>(primaryKeyNames);
+ for (String autoIncrementColumn : autoIncrementColumnNames) {
+ checkState(
+ allFields.contains(autoIncrementColumn),
+ "Auto increment column %s does not exist in table columns
%s.",
+ autoIncrementColumn,
+ columnNames);
+ checkState(
+ !pkSet.contains(autoIncrementColumn),
+ "Auto increment column can not be used as the primary
key.");
+ }
List<Column> newColumns = new ArrayList<>();
for (Column column : columns) {
+ if (autoIncrementColumnNames.contains(column.getName())) {
+ checkState(
+ column.getDataType().is(DataTypeRoot.INTEGER)
+ ||
column.getDataType().is(DataTypeRoot.BIGINT),
+ "The data type of auto increment column must be INT or
BIGINT.");
+ }
+
+ // primary key should not nullable
if (pkSet.contains(column.getName()) &&
column.getDataType().isNullable()) {
newColumns.add(
new Column(
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
b/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
index 527247b6e..6fd1ddb75 100644
---
a/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
+++
b/fluss-common/src/main/java/org/apache/fluss/utils/json/SchemaJsonSerde.java
@@ -36,6 +36,7 @@ public class SchemaJsonSerde implements
JsonSerializer<Schema>, JsonDeserializer
private static final String COLUMNS_NAME = "columns";
private static final String PRIMARY_KEY_NAME = "primary_key";
+ private static final String AUTO_INCREMENT_COLUMN_NAME =
"auto_increment_column";
private static final String VERSION_KEY = "version";
private static final String HIGHEST_FIELD_ID = "highest_field_id";
private static final int VERSION = 1;
@@ -62,6 +63,14 @@ public class SchemaJsonSerde implements
JsonSerializer<Schema>, JsonDeserializer
}
generator.writeEndArray();
}
+ List<String> autoIncrementColumnNames =
schema.getAutoIncrementColumnNames();
+ if (!autoIncrementColumnNames.isEmpty()) {
+ generator.writeArrayFieldStart(AUTO_INCREMENT_COLUMN_NAME);
+ for (String columnName : autoIncrementColumnNames) {
+ generator.writeString(columnName);
+ }
+ generator.writeEndArray();
+ }
generator.writeNumberField(HIGHEST_FIELD_ID,
schema.getHighestFieldId());
@@ -86,6 +95,14 @@ public class SchemaJsonSerde implements
JsonSerializer<Schema>, JsonDeserializer
builder.primaryKey(primaryKeys);
}
+ if (node.has(AUTO_INCREMENT_COLUMN_NAME)) {
+ Iterator<JsonNode> autoIncrementColumnJsons =
+ node.get(AUTO_INCREMENT_COLUMN_NAME).elements();
+ while (autoIncrementColumnJsons.hasNext()) {
+
builder.enableAutoIncrement(autoIncrementColumnJsons.next().asText());
+ }
+ }
+
if (node.has(HIGHEST_FIELD_ID)) {
builder.highestFieldId(node.get(HIGHEST_FIELD_ID).asInt());
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
new file mode 100644
index 000000000..9f35dd602
--- /dev/null
+++ b/fluss-common/src/test/java/org/apache/fluss/metadata/TableSchemaTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link org.apache.fluss.metadata.Schema}. */
+class TableSchemaTest {
+
+ @Test
+ void testAutoIncrementColumnSchema() {
+ assertThatThrownBy(
+ () ->
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .primaryKey("f0")
+ .primaryKey("f0")
+ .build())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Multiple primary keys are not supported.");
+
+ assertThat(
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .enableAutoIncrement("f1")
+ .primaryKey("f0")
+ .build()
+ .getAutoIncrementColumnNames())
+ .isEqualTo(Collections.singletonList("f1"));
+ assertThat(
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .primaryKey("f0")
+ .build()
+ .getAutoIncrementColumnNames())
+ .isEmpty();
+
+ assertThatThrownBy(
+ () ->
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .enableAutoIncrement("f0")
+ .primaryKey("f0")
+ .build())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Auto increment column can not be used as the
primary key.");
+
+ assertThatThrownBy(
+ () ->
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .enableAutoIncrement("f1")
+ .enableAutoIncrement("f1")
+ .primaryKey("f0")
+ .build())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Multiple auto increment columns are not supported
yet.");
+ assertThatThrownBy(
+ () ->
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .enableAutoIncrement("f3")
+ .primaryKey("f0")
+ .build())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("The data type of auto increment column must be
INT or BIGINT.");
+ assertThatThrownBy(
+ () ->
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .enableAutoIncrement("f4")
+ .primaryKey("f0")
+ .build())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Auto increment column f4 does not exist in table
columns [f0, f1, f3].");
+ assertThatThrownBy(
+ () ->
+ Schema.newBuilder()
+ .column("f0", DataTypes.STRING())
+ .column("f1", DataTypes.BIGINT())
+ .column("f3", DataTypes.STRING())
+ .enableAutoIncrement("f1")
+ .build())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Auto increment column can only be used in
primary-key table.");
+ }
+}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
index a7b6dca62..ee7008dbd 100644
---
a/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/utils/json/SchemaJsonSerdeTest.java
@@ -66,6 +66,18 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
.withComment("c is third column")
.build();
+ static final Schema SCHEMA_4 =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a is first column")
+ .column("b", DataTypes.INT())
+ .withComment("b is second column")
+ .column("c", DataTypes.CHAR(10))
+ .withComment("c is third column")
+ .primaryKey("a", "c")
+ .enableAutoIncrement("b")
+ .build();
+
static final String SCHEMA_JSON_0 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
is third
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"highest_field_id\":2}";
static final String SCHEMA_JSON_1 =
@@ -74,18 +86,23 @@ public class SchemaJsonSerdeTest extends
JsonSerdeTestBase<Schema> {
static final String SCHEMA_JSON_3 =
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"BIGINT\"},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"STRING\"},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"TIMESTAMP_WITHOUT_TIME_ZONE\",\"precision\":6},\"comment\":\"c
is third column\",\"id\":2}],\"highest_field_id\":2}";
+ static final String SCHEMA_JSON_4 =
+
"{\"version\":1,\"columns\":[{\"name\":\"a\",\"data_type\":{\"type\":\"INTEGER\",\"nullable\":false},\"comment\":\"a
is first
column\",\"id\":0},{\"name\":\"b\",\"data_type\":{\"type\":\"INTEGER\"},\"comment\":\"b
is second
column\",\"id\":1},{\"name\":\"c\",\"data_type\":{\"type\":\"CHAR\",\"nullable\":false,\"length\":10},\"comment\":\"c
is third
column\",\"id\":2}],\"primary_key\":[\"a\",\"c\"],\"auto_increment_column\":[\"b\"],\"highest_field_id\":2}";
+
SchemaJsonSerdeTest() {
super(SchemaJsonSerde.INSTANCE);
}
@Override
protected Schema[] createObjects() {
- return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3};
+ return new Schema[] {SCHEMA_0, SCHEMA_1, SCHEMA_2, SCHEMA_3, SCHEMA_4};
}
@Override
protected String[] expectedJsons() {
- return new String[] {SCHEMA_JSON_0, SCHEMA_JSON_1, SCHEMA_JSON_1,
SCHEMA_JSON_3};
+ return new String[] {
+ SCHEMA_JSON_0, SCHEMA_JSON_1, SCHEMA_JSON_1, SCHEMA_JSON_3,
SCHEMA_JSON_4
+ };
}
@Test
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 2553b0c18..f8a05573c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -66,6 +66,7 @@ import static
org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
import static org.apache.flink.table.utils.EncodingUtils.encodeBytesToBase64;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
+import static
org.apache.fluss.config.ConfigOptions.TABLE_AUTO_INCREMENT_FIELDS;
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
@@ -196,20 +197,29 @@ public class FlinkConversions {
}
// first build schema with physical columns
- Schema schema =
- schemBuilder
- .fromColumns(
- resolvedSchema.getColumns().stream()
- .filter(Column::isPhysical)
- .map(
- column ->
- new Schema.Column(
-
column.getName(),
-
FlinkConversions.toFlussType(
-
column.getDataType()),
-
column.getComment().orElse(null)))
- .collect(Collectors.toList()))
- .build();
+ schemBuilder.fromColumns(
+ resolvedSchema.getColumns().stream()
+ .filter(Column::isPhysical)
+ .map(
+ column ->
+ new Schema.Column(
+ column.getName(),
+
FlinkConversions.toFlussType(column.getDataType()),
+
column.getComment().orElse(null)))
+ .collect(Collectors.toList()));
+
+ // convert some flink options to fluss table configs.
+ Map<String, String> properties =
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
+
+ if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
+ for (String autoIncrementColumn :
+
properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
+ schemBuilder.enableAutoIncrement(autoIncrementColumn);
+ }
+ }
+
+ Schema schema = schemBuilder.build();
+
resolvedSchema.getColumns().stream()
.filter(col -> col instanceof Column.MetadataColumn)
.findAny()
@@ -245,9 +255,6 @@ public class FlinkConversions {
String comment = catalogBaseTable.getComment();
- // convert some flink options to fluss table configs.
- Map<String, String> properties =
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
-
// then set distributed by information
List<String> bucketKey;
if (flinkTableConf.containsKey(BUCKET_KEY.key())) {