This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new df77b5cd1 [server] Add configuration options to disable PK or Log
table creation (#1503)
df77b5cd1 is described below
commit df77b5cd122bfebf51189245c7a68493e2a15fc6
Author: Yang Wang <[email protected]>
AuthorDate: Mon Sep 8 16:35:15 2025 +0800
[server] Add configuration options to disable PK or Log table creation
(#1503)
Co-authored-by: ocean.wy <[email protected]>
Co-authored-by: Jark Wu <[email protected]>
---
.../org/apache/fluss/config/ConfigOptions.java | 18 +++
.../server/coordinator/CoordinatorService.java | 33 +++++
.../server/coordinator/TableManagerITCase.java | 165 ++++++++++++++++++++-
website/docs/maintenance/configuration.md | 2 +
4 files changed, 210 insertions(+), 8 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 34effd3af..6e738ce89 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -122,6 +122,24 @@ public class ConfigOptions {
"The interval of auto partition check. "
+ "The default value is 10 minutes.");
+ public static final ConfigOption<Boolean> LOG_TABLE_ALLOW_CREATION =
+ key("allow.create.log.tables")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to allow creation of log tables. When set
to false, "
+ + "attempts to create log tables (tables
without primary key) will be rejected. "
+ + "The default value is true.");
+
+ public static final ConfigOption<Boolean> KV_TABLE_ALLOW_CREATION =
+ key("allow.create.kv.tables")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to allow creation of kv tables (primary
key tables). When set to false, "
+ + "attempts to create kv tables (tables
with primary key) will be rejected. "
+ + "The default value is true.");
+
public static final ConfigOption<Integer> MAX_PARTITION_NUM =
key("max.partition.num")
.intType()
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index f1b9a8e2f..5f87937a2 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -135,6 +135,8 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
private final int defaultBucketNumber;
private final int defaultReplicationFactor;
+ private final boolean logTableAllowCreation;
+ private final boolean kvTableAllowCreation;
private final Supplier<EventManager> eventManagerSupplier;
private final Supplier<Integer> coordinatorEpochSupplier;
private final ServerMetadataCache metadataCache;
@@ -157,6 +159,8 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
super(remoteFileSystem, ServerType.COORDINATOR, zkClient,
metadataManager, authorizer);
this.defaultBucketNumber =
conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
this.defaultReplicationFactor =
conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
+ this.logTableAllowCreation =
conf.getBoolean(ConfigOptions.LOG_TABLE_ALLOW_CREATION);
+ this.kvTableAllowCreation =
conf.getBoolean(ConfigOptions.KV_TABLE_ALLOW_CREATION);
this.eventManagerSupplier =
() ->
coordinatorEventProcessorSupplier.get().getCoordinatorEventManager();
this.coordinatorEpochSupplier =
@@ -245,6 +249,9 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
}
}
+ // Check table creation permissions based on table type
+ validateTableCreationPermission(tableDescriptor, tablePath);
+
// apply system defaults if the config is not set
tableDescriptor = applySystemDefaults(tableDescriptor);
@@ -651,4 +658,30 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
});
return bucketMetadataList;
}
+
+ /**
+ * Validates whether the table creation is allowed based on the table type
and configuration.
+ *
+ * @param tableDescriptor the table descriptor to validate
+ * @param tablePath the table path for error reporting
+ * @throws InvalidTableException if table creation is not allowed
+ */
+ private void validateTableCreationPermission(
+ TableDescriptor tableDescriptor, TablePath tablePath) {
+ boolean hasPrimaryKey = tableDescriptor.hasPrimaryKey();
+
+ if (hasPrimaryKey) {
+ // This is a KV table (Primary Key Table)
+ if (!kvTableAllowCreation) {
+ throw new InvalidTableException(
+ "Creation of Primary Key Tables is disallowed in the
cluster.");
+ }
+ } else {
+ // This is a Log table
+ if (!logTableAllowCreation) {
+ throw new InvalidTableException(
+ "Creation of Log Tables is disallowed in the
cluster.");
+ }
+ }
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 85c18a5f0..253774854 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -157,7 +157,7 @@ class TableManagerITCase {
.createTable(
newCreateTableRequest(
new TablePath("db",
"=invalid_table!"),
- newTable(),
+ newPkTable(),
true))
.get())
.cause()
@@ -170,7 +170,7 @@ class TableManagerITCase {
.createTable(
newCreateTableRequest(
new TablePath("",
"=invalid_table!"),
- newTable(),
+ newPkTable(),
true))
.get())
.cause()
@@ -272,7 +272,7 @@ class TableManagerITCase {
adminGateway.dropTable(newDropTableRequest(db1, tb1, true)).get();
// then create a table
- TableDescriptor tableDescriptor = newTable();
+ TableDescriptor tableDescriptor = newPkTable();
adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
// the table should exist then
@@ -453,7 +453,7 @@ class TableManagerITCase {
TablePath tablePath = TablePath.of(db1, tb1);
// first create a database
adminGateway.createDatabase(newCreateDatabaseRequest(db1,
false)).get();
- TableDescriptor tableDescriptor = newTable();
+ TableDescriptor tableDescriptor = newPkTable();
adminGateway.createTable(newCreateTableRequest(tablePath,
tableDescriptor, false)).get();
GetTableInfoResponse response =
gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
@@ -715,7 +715,7 @@ class TableManagerITCase {
}
private static TableDescriptor newTableWithoutSettingDistribution() {
- return TableDescriptor.builder().schema(newSchema()).comment("first
table").build();
+ return TableDescriptor.builder().schema(newPkSchema()).comment("first
table").build();
}
private static TableDescriptor newPartitionedTable() {
@@ -749,15 +749,15 @@ class TableManagerITCase {
.property(ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, 1);
}
- private static TableDescriptor newTable() {
+ private static TableDescriptor newPkTable() {
return TableDescriptor.builder()
- .schema(newSchema())
+ .schema(newPkSchema())
.comment("first table")
.distributedBy(3, "a")
.build();
}
- private static Schema newSchema() {
+ private static Schema newPkSchema() {
return Schema.newBuilder()
.column("a", DataTypes.INT())
.withComment("a comment")
@@ -798,4 +798,153 @@ class TableManagerITCase {
});
return updateMetadataRequest;
}
+
+ // Test methods for table creation restrictions
+
+ @Test
+ void testLogTableCreationRestriction() throws Exception {
+ // Test with cluster that disallows log table creation
+ FlussClusterExtension kvCluster =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(3)
+ .setCoordinatorServerListeners(
+ String.format(
+ "%s://localhost:0, %s://localhost:0",
+ DEFAULT_LISTENER_NAME,
CLIENT_LISTENER))
+ .setTabletServerListeners(
+ String.format(
+ "%s://localhost:0, %s://localhost:0",
+ DEFAULT_LISTENER_NAME,
CLIENT_LISTENER))
+ .setClusterConf(initLogRestrictedConf())
+ .build();
+
+ try {
+ kvCluster.start();
+
+ AdminGateway kvClusterGateway = kvCluster.newCoordinatorClient();
+
+ String tb1 = "log_table";
+ TablePath tablePath = TablePath.of("fluss", tb1);
+
+ // Try to create a log table (table without primary key), should
fail
+ TableDescriptor logTableDescriptor = newLogTable();
+ assertThatThrownBy(
+ () ->
+ kvClusterGateway
+ .createTable(
+ newCreateTableRequest(
+ tablePath,
logTableDescriptor, false))
+ .get())
+ .cause()
+ .isInstanceOf(InvalidTableException.class)
+ .hasMessageContaining("Creation of Log Tables is
disallowed in the cluster.");
+
+ // Try to create a kv table (table with primary key), should
succeed
+ String tb2 = "kv_table";
+ TablePath kvTablePath = TablePath.of("fluss", tb2);
+ TableDescriptor kvTableDescriptor = newPkTable();
+ kvClusterGateway
+ .createTable(newCreateTableRequest(kvTablePath,
kvTableDescriptor, false))
+ .get();
+
+ // Verify the kv table was created successfully
+ assertThat(
+ kvClusterGateway
+
.tableExists(newTableExistsRequest(kvTablePath))
+ .get()
+ .isExists())
+ .isTrue();
+ } finally {
+ kvCluster.close();
+ }
+ }
+
+ @Test
+ void testKvTableCreationRestriction() throws Exception {
+ // Test with cluster that disallows kv table creation
+ FlussClusterExtension logCluster =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(3)
+ .setCoordinatorServerListeners(
+ String.format(
+ "%s://localhost:0, %s://localhost:0",
+ DEFAULT_LISTENER_NAME,
CLIENT_LISTENER))
+ .setTabletServerListeners(
+ String.format(
+ "%s://localhost:0, %s://localhost:0",
+ DEFAULT_LISTENER_NAME,
CLIENT_LISTENER))
+ .setClusterConf(initKvRestrictedConf())
+ .build();
+
+ try {
+ logCluster.start();
+ AdminGateway logClusterGateway = logCluster.newCoordinatorClient();
+
+ String tb1 = "kv_table";
+ TablePath tablePath = TablePath.of("fluss", tb1);
+
+ // Try to create a kv table (table with primary key), should fail
+ TableDescriptor kvTableDescriptor = newPkTable();
+ assertThatThrownBy(
+ () ->
+ logClusterGateway
+ .createTable(
+ newCreateTableRequest(
+ tablePath,
kvTableDescriptor, false))
+ .get())
+ .cause()
+ .isInstanceOf(InvalidTableException.class)
+ .hasMessageContaining(
+ "Creation of Primary Key Tables is disallowed in
the cluster.");
+
+ // Try to create a log table (table without primary key), should
succeed
+ String tb2 = "log_table";
+ TablePath logTablePath = TablePath.of("fluss", tb2);
+ TableDescriptor logTableDescriptor = newLogTable();
+ logClusterGateway
+ .createTable(newCreateTableRequest(logTablePath,
logTableDescriptor, false))
+ .get();
+
+ // Verify the log table was created successfully
+ assertThat(
+ logClusterGateway
+
.tableExists(newTableExistsRequest(logTablePath))
+ .get()
+ .isExists())
+ .isTrue();
+ } finally {
+ logCluster.close();
+ }
+ }
+
+ private static Configuration initLogRestrictedConf() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL,
Duration.ofSeconds(1));
+ conf.set(ConfigOptions.LOG_TABLE_ALLOW_CREATION, false);
+ return conf;
+ }
+
+ private static Configuration initKvRestrictedConf() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL,
Duration.ofSeconds(1));
+ conf.set(ConfigOptions.KV_TABLE_ALLOW_CREATION, false);
+ return conf;
+ }
+
+ // Helper methods for creating different table types
+ private static TableDescriptor newLogTable() {
+ return TableDescriptor.builder()
+ .schema(newLogSchema())
+ .comment("log table without primary key")
+ .distributedBy(3, "a")
+ .build();
+ }
+
+ private static Schema newLogSchema() {
+ return Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a comment")
+ .column("b", DataTypes.STRING())
+ .build();
+ }
}
diff --git a/website/docs/maintenance/configuration.md
b/website/docs/maintenance/configuration.md
index a38284b45..c2b7cb959 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -39,6 +39,8 @@ during the Fluss cluster working.
| remote.fs.write-buffer-size | MemorySize | 4kb
| The
default size of the write buffer for writing the local files to remote file
systems.
[...]
| plugin.classloader.parent-first-patterns.default | String |
java.,<br/>org.apache.fluss.,<br/>javax.annotation.,<br/>org.slf4j,<br/>org.apache.log4j,<br/>org.apache.logging,<br/>org.apache.commons.logging,<br/>ch.qos.logback
| A (semicolon-separated) list of patterns that specifies which classes should
always be resolved through the plugin parent ClassLoader first. A pattern is a
simple prefix that is checked against the fully qualified class name. This
setting should generally not [...]
| auto-partition.check.interval | Duration | 10min
| The
interval of auto partition check. The default value is 10 minutes.
[...]
+| allow.create.log.tables | Boolean | true
|
Whether to allow creation of log tables. When set to false, attempts to create
log tables (tables without primary key) will be rejected. The default value is
true.
[...]
+| allow.create.kv.tables | Boolean | true
|
Whether to allow creation of kv tables (primary key tables). When set to false,
attempts to create kv tables (tables with primary key) will be rejected. The
default value is true.
[...]
| max.partition.num | Integer | 1000
|
Limits the maximum number of partitions that can be created for a partitioned
table to avoid creating too many partitions.
[...]
| max.bucket.num | Integer | 128000
| The
maximum number of buckets that can be created for a table. The default value is
128000.
[...]
| acl.notification.expiration-time | Duration | 15min
| The
duration for which ACL notifications are valid before they expire. This
configuration determines the time window during which an ACL notification is
considered active. After this duration, the notification will no longer be
valid and will be discarded. T [...]