This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4eb3d901 Fluss admin use tablet server for read only operation to
reduce the cost of coordinator. (#1303)
4eb3d901 is described below
commit 4eb3d90188cc9b289044b0e1519084a097168f38
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Jul 17 13:50:23 2025 +0800
Fluss admin use tablet server for read only operation to reduce the cost of
coordinator. (#1303)
* Fluss admin use tablet server for read only operation to reduce the cost
of coordinator.
* fix test
* modified based on cr
---
.../com/alibaba/fluss/client/admin/FlussAdmin.java | 43 ++--
.../fluss/client/admin/FlussAdminITCase.java | 32 +--
.../security/acl/FlussAuthorizationITCase.java | 274 +++++++++++----------
.../security/acl/FlinkAuthorizationITCase.java | 17 ++
.../java/com/alibaba/fluss/server/ServerBase.java | 7 +
.../server/coordinator/CoordinatorServer.java | 5 +
.../server/testutils/FlussClusterExtension.java | 64 ++---
7 files changed, 250 insertions(+), 192 deletions(-)
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
index 414e0c25..e50b5c70 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
@@ -38,6 +38,7 @@ import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.rpc.GatewayClientProxy;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.gateway.AdminGateway;
+import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -99,6 +100,7 @@ import static
com.alibaba.fluss.utils.Preconditions.checkNotNull;
public class FlussAdmin implements Admin {
private final AdminGateway gateway;
+ private final AdminReadOnlyGateway readOnlyGateway;
private final MetadataUpdater metadataUpdater;
private final RpcClient client;
@@ -106,6 +108,9 @@ public class FlussAdmin implements Admin {
this.gateway =
GatewayClientProxy.createGatewayProxy(
metadataUpdater::getCoordinatorServer, client,
AdminGateway.class);
+ this.readOnlyGateway =
+ GatewayClientProxy.createGatewayProxy(
+ metadataUpdater::getRandomTabletServer, client,
AdminGateway.class);
this.metadataUpdater = metadataUpdater;
this.client = client;
}
@@ -119,7 +124,7 @@ public class FlussAdmin implements Admin {
List<ServerNode> serverNodeList = new ArrayList<>();
Cluster cluster =
sendMetadataRequestAndRebuildCluster(
- gateway,
+ readOnlyGateway,
false,
metadataUpdater.getCluster(),
null,
@@ -142,7 +147,8 @@ public class FlussAdmin implements Admin {
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
- return gateway.getTableSchema(request)
+ return readOnlyGateway
+ .getTableSchema(request)
.thenApply(
r ->
new SchemaInfo(
@@ -157,7 +163,8 @@ public class FlussAdmin implements Admin {
.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
- return gateway.getTableSchema(request)
+ return readOnlyGateway
+ .getTableSchema(request)
.thenApply(
r ->
new SchemaInfo(
@@ -179,7 +186,8 @@ public class FlussAdmin implements Admin {
public CompletableFuture<DatabaseInfo> getDatabaseInfo(String
databaseName) {
GetDatabaseInfoRequest request = new GetDatabaseInfoRequest();
request.setDatabaseName(databaseName);
- return gateway.getDatabaseInfo(request)
+ return readOnlyGateway
+ .getDatabaseInfo(request)
.thenApply(
r ->
new DatabaseInfo(
@@ -204,13 +212,14 @@ public class FlussAdmin implements Admin {
public CompletableFuture<Boolean> databaseExists(String databaseName) {
DatabaseExistsRequest request = new DatabaseExistsRequest();
request.setDatabaseName(databaseName);
- return
gateway.databaseExists(request).thenApply(DatabaseExistsResponse::isExists);
+ return
readOnlyGateway.databaseExists(request).thenApply(DatabaseExistsResponse::isExists);
}
@Override
public CompletableFuture<List<String>> listDatabases() {
ListDatabasesRequest request = new ListDatabasesRequest();
- return gateway.listDatabases(request)
+ return readOnlyGateway
+ .listDatabases(request)
.thenApply(ListDatabasesResponse::getDatabaseNamesList);
}
@@ -233,7 +242,8 @@ public class FlussAdmin implements Admin {
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
- return gateway.getTableInfo(request)
+ return readOnlyGateway
+ .getTableInfo(request)
.thenApply(
r ->
TableInfo.of(
@@ -261,14 +271,14 @@ public class FlussAdmin implements Admin {
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
- return
gateway.tableExists(request).thenApply(TableExistsResponse::isExists);
+ return
readOnlyGateway.tableExists(request).thenApply(TableExistsResponse::isExists);
}
@Override
public CompletableFuture<List<String>> listTables(String databaseName) {
ListTablesRequest request = new ListTablesRequest();
request.setDatabaseName(databaseName);
- return
gateway.listTables(request).thenApply(ListTablesResponse::getTableNamesList);
+ return
readOnlyGateway.listTables(request).thenApply(ListTablesResponse::getTableNamesList);
}
@Override
@@ -289,7 +299,8 @@ public class FlussAdmin implements Admin {
PbPartitionSpec pbPartitionSpec =
makePbPartitionSpec(partitionSpec);
request.setPartialPartitionSpec(pbPartitionSpec);
}
- return gateway.listPartitionInfos(request)
+ return readOnlyGateway
+ .listPartitionInfos(request)
.thenApply(ClientRpcMessageUtils::toPartitionInfos);
}
@@ -315,7 +326,8 @@ public class FlussAdmin implements Admin {
request.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
- return gateway.getLatestKvSnapshots(request)
+ return readOnlyGateway
+ .getLatestKvSnapshots(request)
.thenApply(ClientRpcMessageUtils::toKvSnapshots);
}
@@ -328,7 +340,8 @@ public class FlussAdmin implements Admin {
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
request.setPartitionName(partitionName);
- return gateway.getLatestKvSnapshots(request)
+ return readOnlyGateway
+ .getLatestKvSnapshots(request)
.thenApply(ClientRpcMessageUtils::toKvSnapshots);
}
@@ -342,7 +355,8 @@ public class FlussAdmin implements Admin {
request.setTableId(bucket.getTableId())
.setBucketId(bucket.getBucket())
.setSnapshotId(snapshotId);
- return gateway.getKvSnapshotMetadata(request)
+ return readOnlyGateway
+ .getKvSnapshotMetadata(request)
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
}
@@ -353,7 +367,8 @@ public class FlussAdmin implements Admin {
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
- return gateway.getLatestLakeSnapshot(request)
+ return readOnlyGateway
+ .getLatestLakeSnapshot(request)
.thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
}
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
index 62ad48a7..274dfbac 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
@@ -130,12 +130,13 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
void testGetDatabaseInfo() throws Exception {
long timestampBeforeCreate = System.currentTimeMillis();
admin.createDatabase(
- "test_db_2",
- DatabaseDescriptor.builder()
- .comment("test comment")
- .customProperty("key1", "value1")
- .build(),
- false);
+ "test_db_2",
+ DatabaseDescriptor.builder()
+ .comment("test comment")
+ .customProperty("key1", "value1")
+ .build(),
+ false)
+ .get();
DatabaseInfo databaseInfo = admin.getDatabaseInfo("test_db_2").get();
long timestampAfterCreate = System.currentTimeMillis();
assertThat(databaseInfo.getCreatedTime()).isEqualTo(databaseInfo.getModifiedTime());
@@ -181,7 +182,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
// create and get a new table
long timestampBeforeCreate = System.currentTimeMillis();
TablePath tablePath = TablePath.of("test_db", "table_2");
- admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
+ admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
tableInfo = admin.getTableInfo(tablePath).get();
timestampAfterCreate = System.currentTimeMillis();
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
@@ -378,16 +379,19 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
// assert the cluster should have tablet server number to be 3
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
- FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
// we can create the table now
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
- TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
- assertThat(tableInfo.toTableDescriptor())
- .isEqualTo(
- DEFAULT_TABLE_DESCRIPTOR
- .withReplicationFactor(3)
- .withDataLakeFormat(DataLakeFormat.PAIMON));
+ // recreate the connection because the metadata of tablet server has
changed
+ try (Connection conn = ConnectionFactory.createConnection(clientConf);
+ Admin admin = conn.getAdmin()) {
+ TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
+ assertThat(tableInfo.toTableDescriptor())
+ .isEqualTo(
+ DEFAULT_TABLE_DESCRIPTOR
+ .withReplicationFactor(3)
+
.withDataLakeFormat(DataLakeFormat.PAIMON));
+ }
}
@Test
diff --git
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
index 009c8f6e..f3a87bc3 100644
---
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
+++
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -33,6 +33,7 @@ import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.rpc.GatewayClientProxy;
@@ -217,18 +218,16 @@ public class FlussAuthorizationITCase {
void testAclOperation() throws Exception {
// Test whether the user has authorization to perform the "list ACLs"
operation.
assertThat(guestAdmin.listAcls(AclBindingFilter.ANY).get()).isEmpty();
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
- Resource.cluster(),
- new AccessControlEntry(
- guestPrincipal,
- WILD_CARD_HOST,
- OperationType.DESCRIBE,
- PermissionType.ALLOW))))
- .all()
- .get();
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.cluster(),
+ new AccessControlEntry(
+ guestPrincipal,
+ WILD_CARD_HOST,
+ OperationType.DESCRIBE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
assertThat(guestAdmin.listAcls(AclBindingFilter.ANY).get()).hasSize(1);
// test whether the user have authorization to operate create and drop
acls.
@@ -238,7 +237,7 @@ public class FlussAuthorizationITCase {
Resource.table("test_db", "test_table"),
new AccessControlEntry(
user1, "*", OperationType.CREATE,
PermissionType.ALLOW));
- List<AclBinding> aclBindings =
+ List<AclBinding> noAuthorizationAclBinding =
Arrays.asList(
user1AclBinding,
new AclBinding(
@@ -255,24 +254,22 @@ public class FlussAuthorizationITCase {
"127.0.0.1",
OperationType.DROP,
PermissionType.ALLOW)));
- assertThatThrownBy(() ->
guestAdmin.createAcls(aclBindings).all().get())
+ assertThatThrownBy(() ->
guestAdmin.createAcls(noAuthorizationAclBinding).all().get())
.hasMessageContaining(
"Principal %s have no authorization to operate ALTER
on resource",
guestPrincipal);
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
- Resource.cluster(),
- new AccessControlEntry(
- WILD_CARD_PRINCIPAL,
- WILD_CARD_HOST,
- OperationType.ALTER,
- PermissionType.ALLOW))))
- .all()
- .get();
- guestAdmin.createAcls(aclBindings).all().get();
+ aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.cluster(),
+ new AccessControlEntry(
+ WILD_CARD_PRINCIPAL,
+ WILD_CARD_HOST,
+ OperationType.ALTER,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ guestAdmin.createAcls(noAuthorizationAclBinding).all().get();
assertThat(
guestAdmin
@@ -305,18 +302,16 @@ public class FlussAuthorizationITCase {
String.format(
"Principal %s have no authorization to operate
CREATE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
guestPrincipal));
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
- Resource.cluster(),
- new AccessControlEntry(
- guestPrincipal,
- "*",
- OperationType.CREATE,
- PermissionType.ALLOW))))
- .all()
- .get();
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.cluster(),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.CREATE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
guestAdmin.createDatabase("test-database2", DatabaseDescriptor.EMPTY,
false).get();
assertThat(rootAdmin.databaseExists("test-database1").get()).isFalse();
assertThat(rootAdmin.databaseExists("test-database2").get()).isTrue();
@@ -330,32 +325,30 @@ public class FlussAuthorizationITCase {
.containsExactlyInAnyOrderElementsOf(
Lists.newArrayList("fluss",
DATA1_TABLE_PATH_PK.getDatabaseName()));
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
- Resource.database("fluss"),
- new AccessControlEntry(
- guestPrincipal,
- "*",
- OperationType.DESCRIBE,
- PermissionType.ALLOW))))
- .all()
- .get();
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.database("fluss"),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.DESCRIBE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
assertThat(guestAdmin.listDatabases().get()).isEqualTo(Collections.singletonList("fluss"));
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
- Resource.cluster(),
- new AccessControlEntry(
- guestPrincipal,
- "*",
- OperationType.ALL,
- PermissionType.ALLOW))))
- .all()
- .get();
+ aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.cluster(),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.ALL,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
assertThat(guestAdmin.listDatabases().get())
.containsExactlyInAnyOrderElementsOf(
Lists.newArrayList("fluss",
DATA1_TABLE_PATH_PK.getDatabaseName()));
@@ -374,18 +367,17 @@ public class FlussAuthorizationITCase {
"Principal %s have no authorization to operate
CREATE on resource Resource{type=DATABASE, name='test_db_1'}",
guestPrincipal));
assertThat(rootAdmin.tableExists(DATA1_TABLE_PATH).get()).isFalse();
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
-
Resource.database(DATA1_TABLE_PATH.getDatabaseName()),
- new AccessControlEntry(
- guestPrincipal,
- "*",
- OperationType.CREATE,
- PermissionType.ALLOW))))
- .all()
- .get();
+
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+
Resource.database(DATA1_TABLE_PATH.getDatabaseName()),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.CREATE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
guestAdmin.createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR,
false).get();
assertThat(rootAdmin.tableExists(DATA1_TABLE_PATH).get()).isTrue();
}
@@ -394,18 +386,18 @@ public class FlussAuthorizationITCase {
void testListTables() throws Exception {
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
.isEqualTo(Collections.emptyList());
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
-
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
- new AccessControlEntry(
- guestPrincipal,
- "*",
- OperationType.DESCRIBE,
- PermissionType.ALLOW))))
- .all()
- .get();
+
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.DESCRIBE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
.isEqualTo(Collections.singletonList(DATA1_TABLE_PATH_PK.getTableName()));
}
@@ -428,19 +420,17 @@ public class FlussAuthorizationITCase {
.isEmpty();
// if add acl to allow guest read any resource, it will allow to
get metadata.
- rootAdmin
- .createAcls(
- Collections.singletonList(
- new AclBinding(
-
Resource.table(DATA1_TABLE_PATH_PK),
- new AccessControlEntry(
- guestPrincipal,
- "*",
- OperationType.DESCRIBE,
- PermissionType.ALLOW))))
- .all()
- .get();
-
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(DATA1_TABLE_PATH_PK),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.DESCRIBE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings,
true);
assertThat(guestGateway.metadata(metadataRequest).get().getTableMetadatasList())
.hasSize(1);
}
@@ -454,16 +444,20 @@ public class FlussAuthorizationITCase {
TableDescriptor descriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
rootAdmin.createTable(writeAclTable, descriptor, false).get();
+ TableInfo tableInfo = rootAdmin.getTableInfo(writeAclTable).get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableInfo.getTableId());
// create acl to allow guest write.
- AclBinding aclBinding =
- new AclBinding(
- Resource.table(writeAclTable),
- new AccessControlEntry(
- guestPrincipal, "*", OperationType.WRITE,
PermissionType.ALLOW));
-
rootAdmin.createAcls(Collections.singletonList(aclBinding)).all().get();
-
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
- rootAdmin.getTableInfo(writeAclTable).get().getTableId(),
aclBinding);
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(writeAclTable),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.WRITE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
FlussConnection flussConnection = (FlussConnection) guestConn;
TabletServerGateway tabletServerGateway =
@@ -512,26 +506,31 @@ public class FlussAuthorizationITCase {
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
rootAdmin.createTable(writeAclTable, descriptor, false).get();
rootAdmin.createTable(noWriteAclTable, descriptor, false).get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+ rootAdmin.getTableInfo(writeAclTable).get().getTableId());
+ FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+ rootAdmin.getTableInfo(noWriteAclTable).get().getTableId());
// create acl to allow guest write for writeAclTable.
- AclBinding aclBindingOfWriteAclTable =
- new AclBinding(
- Resource.table(writeAclTable),
- new AccessControlEntry(
- guestPrincipal, "*", OperationType.WRITE,
PermissionType.ALLOW));
- AclBinding aclBindingOfNoWriteAclTable =
- new AclBinding(
- Resource.table(noWriteAclTable),
- new AccessControlEntry(guestPrincipal, "*", READ,
PermissionType.ALLOW));
-
rootAdmin.createAcls(Collections.singletonList(aclBindingOfWriteAclTable)).all().get();
-
rootAdmin.createAcls(Collections.singletonList(aclBindingOfNoWriteAclTable)).all().get();
-
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
- rootAdmin.getTableInfo(writeAclTable).get().getTableId(),
- aclBindingOfWriteAclTable);
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
- rootAdmin.getTableInfo(noWriteAclTable).get().getTableId(),
- aclBindingOfNoWriteAclTable);
+ List<AclBinding> aclBindingOfWriteAclTables =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(writeAclTable),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.WRITE,
+ PermissionType.ALLOW)));
+ List<AclBinding> aclBindingOfNoWriteAclTables =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(noWriteAclTable),
+ new AccessControlEntry(
+ guestPrincipal, "*", READ,
PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindingOfWriteAclTables).all().get();
+
FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindingOfWriteAclTables,
true);
+ rootAdmin.createAcls(aclBindingOfNoWriteAclTables).all().get();
+
FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindingOfNoWriteAclTables,
true);
// 1. Try to write data to noWriteAclTable. It should throw
AuthorizationException because
// of request writeId failed.
@@ -577,15 +576,20 @@ public class FlussAuthorizationITCase {
TableDescriptor descriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
rootAdmin.createTable(DATA1_TABLE_PATH, descriptor, false).get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+ rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId());
// create acl to allow guest write.
- AclBinding aclBinding =
- new AclBinding(
- Resource.table(DATA1_TABLE_PATH),
- new AccessControlEntry(
- guestPrincipal, "*", OperationType.WRITE,
PermissionType.ALLOW));
-
rootAdmin.createAcls(Collections.singletonList(aclBinding)).all().get();
- FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
- rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId(),
aclBinding);
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+ Resource.table(DATA1_TABLE_PATH),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.WRITE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
try (Table table = guestConn.getTable(DATA1_TABLE_PATH)) {
AppendWriter appendWriter = table.newAppend().createWriter();
appendWriter.append(row(1, "a")).get();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 3d0e5bec..24497100 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -24,8 +24,11 @@ import com.alibaba.fluss.exception.AuthorizationException;
import com.alibaba.fluss.flink.catalog.FlinkCatalogOptions;
import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.security.acl.AccessControlEntry;
+import com.alibaba.fluss.security.acl.AclBinding;
import com.alibaba.fluss.security.acl.FlussPrincipal;
import com.alibaba.fluss.security.acl.OperationType;
+import com.alibaba.fluss.security.acl.PermissionType;
import com.alibaba.fluss.security.acl.Resource;
import com.alibaba.fluss.security.auth.sasl.jaas.LoginManager;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
@@ -421,6 +424,13 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
String.format("%s:%s", guest.getType(),
guest.getName()),
operationType.name()))
.await();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(
+ Collections.singletonList(
+ new AclBinding(
+ resource,
+ new AccessControlEntry(
+ guest, "*", operationType,
PermissionType.ALLOW))),
+ true);
}
void dropAcl(Resource resource, OperationType operationType)
@@ -433,6 +443,13 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
String.format("%s:%s", guest.getType(),
guest.getName()),
operationType.name()))
.await();
+ FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(
+ Collections.singletonList(
+ new AclBinding(
+ resource,
+ new AccessControlEntry(
+ guest, "*", operationType,
PermissionType.ALLOW))),
+ false);
}
private static Configuration initConfig() {
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
index d2f6fc4d..9e18be14 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
@@ -17,6 +17,7 @@
package com.alibaba.fluss.server;
+import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.FlussException;
@@ -24,6 +25,7 @@ import com.alibaba.fluss.fs.FileSystem;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.plugin.PluginManager;
import com.alibaba.fluss.plugin.PluginUtils;
+import com.alibaba.fluss.server.authorizer.Authorizer;
import com.alibaba.fluss.server.coordinator.CoordinatorServer;
import com.alibaba.fluss.server.exception.FlussParseException;
import com.alibaba.fluss.server.tablet.TabletServer;
@@ -37,6 +39,8 @@ import com.alibaba.fluss.utils.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.lang.reflect.UndeclaredThrowableException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
@@ -177,6 +181,9 @@ public abstract class ServerBase implements
AutoCloseableAsync, FatalErrorHandle
protected abstract String getServerName();
+ @VisibleForTesting
+ public abstract @Nullable Authorizer getAuthorizer();
+
/** Result for run {@link ServerBase}. */
public enum Result {
SUCCESS(SUCCESS_EXIT_CODE),
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
index e54fc9ee..e8388f10 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
@@ -492,6 +492,11 @@ public class CoordinatorServer extends ServerBase {
return metadataCache;
}
+ @VisibleForTesting
+ public @Nullable Authorizer getAuthorizer() {
+ return authorizer;
+ }
+
private static void validateConfigs(Configuration conf) {
if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
throw new IllegalConfigurationException(
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
index 87a7f38e..0d92bb0a 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
@@ -41,6 +41,7 @@ import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
import com.alibaba.fluss.security.acl.AccessControlEntry;
import com.alibaba.fluss.security.acl.AclBinding;
+import com.alibaba.fluss.server.ServerBase;
import com.alibaba.fluss.server.authorizer.Authorizer;
import com.alibaba.fluss.server.authorizer.DefaultAuthorizer;
import com.alibaba.fluss.server.coordinator.CoordinatorServer;
@@ -81,6 +82,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -519,13 +521,6 @@ public final class FlussClusterExtension
/** Wait until all the table assignments buckets are ready for table. */
public void waitUtilTableReady(long tableId) {
- waitUtilTableReadyWithAuthorization(tableId, null);
- }
-
- /**
- * Wait until all the table assignments buckets and required authorization
are ready for table.
- */
- public void waitUtilTableReadyWithAuthorization(long tableId, @Nullable
AclBinding aclBinding) {
ZooKeeperClient zkClient = getZooKeeperClient();
retry(
Duration.ofMinutes(1),
@@ -534,29 +529,40 @@ public final class FlussClusterExtension
zkClient.getTableAssignment(tableId);
assertThat(tableAssignmentOpt).isPresent();
waitReplicaInAssignmentReady(zkClient,
tableAssignmentOpt.get(), tableId, null);
+ });
+ }
- if (aclBinding != null) {
- getTabletServers()
- .forEach(
- ts -> {
- Authorizer authorizer =
ts.getAuthorizer();
- assertThat(authorizer).isNotNull();
- AccessControlEntry
accessControlEntry =
-
aclBinding.getAccessControlEntry();
- assertThat(
-
((DefaultAuthorizer) authorizer)
-
.aclsAllowAccess(
-
aclBinding
-
.getResource(),
-
accessControlEntry
-
.getPrincipal(),
-
accessControlEntry
-
.getOperationType(),
-
accessControlEntry
-
.getHost()))
- .isTrue();
- });
- }
+ /**
+ * Wait until all authorization are synchronized to all tablet servers.
+ *
+ * @param aclBindings aclBindings to be synchronized.
+ * @param exist whether aclBinding exist.
+ */
+ public void waitUtilAuthenticationSync(Collection<AclBinding> aclBindings,
boolean exist) {
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ Set<ServerBase> servers = new
HashSet<>(getTabletServers());
+ servers.add(getCoordinatorServer());
+ servers.forEach(
+ ts -> {
+ Authorizer authorizer = ts.getAuthorizer();
+ assertThat(authorizer).isNotNull();
+ for (AclBinding aclBinding : aclBindings) {
+ AccessControlEntry accessControlEntry =
+ aclBinding.getAccessControlEntry();
+ assertThat(
+ ((DefaultAuthorizer)
authorizer)
+ .aclsAllowAccess(
+
aclBinding.getResource(),
+
accessControlEntry
+
.getPrincipal(),
+
accessControlEntry
+
.getOperationType(),
+
accessControlEntry.getHost()))
+ .isEqualTo(exist);
+ }
+ });
});
}