This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4eb3d901 Fluss admin use tablet server for read only operation to 
reduce the cost of coordinator. (#1303)
4eb3d901 is described below

commit 4eb3d90188cc9b289044b0e1519084a097168f38
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Jul 17 13:50:23 2025 +0800

    Fluss admin use tablet server for read only operation to reduce the cost of 
coordinator. (#1303)
    
    * Fluss admin use tablet server for read only operation to reduce the cost 
of coordinator.
    
    * fix test
    
    * modified based on cr
---
 .../com/alibaba/fluss/client/admin/FlussAdmin.java |  43 ++--
 .../fluss/client/admin/FlussAdminITCase.java       |  32 +--
 .../security/acl/FlussAuthorizationITCase.java     | 274 +++++++++++----------
 .../security/acl/FlinkAuthorizationITCase.java     |  17 ++
 .../java/com/alibaba/fluss/server/ServerBase.java  |   7 +
 .../server/coordinator/CoordinatorServer.java      |   5 +
 .../server/testutils/FlussClusterExtension.java    |  64 ++---
 7 files changed, 250 insertions(+), 192 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
index 414e0c25..e50b5c70 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
@@ -38,6 +38,7 @@ import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.rpc.GatewayClientProxy;
 import com.alibaba.fluss.rpc.RpcClient;
 import com.alibaba.fluss.rpc.gateway.AdminGateway;
+import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
 import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
 import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
 import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -99,6 +100,7 @@ import static 
com.alibaba.fluss.utils.Preconditions.checkNotNull;
 public class FlussAdmin implements Admin {
 
     private final AdminGateway gateway;
+    private final AdminReadOnlyGateway readOnlyGateway;
     private final MetadataUpdater metadataUpdater;
     private final RpcClient client;
 
@@ -106,6 +108,9 @@ public class FlussAdmin implements Admin {
         this.gateway =
                 GatewayClientProxy.createGatewayProxy(
                         metadataUpdater::getCoordinatorServer, client, 
AdminGateway.class);
+        this.readOnlyGateway =
+                GatewayClientProxy.createGatewayProxy(
+                        metadataUpdater::getRandomTabletServer, client, 
AdminGateway.class);
         this.metadataUpdater = metadataUpdater;
         this.client = client;
     }
@@ -119,7 +124,7 @@ public class FlussAdmin implements Admin {
                         List<ServerNode> serverNodeList = new ArrayList<>();
                         Cluster cluster =
                                 sendMetadataRequestAndRebuildCluster(
-                                        gateway,
+                                        readOnlyGateway,
                                         false,
                                         metadataUpdater.getCluster(),
                                         null,
@@ -142,7 +147,8 @@ public class FlussAdmin implements Admin {
         request.setTablePath()
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
-        return gateway.getTableSchema(request)
+        return readOnlyGateway
+                .getTableSchema(request)
                 .thenApply(
                         r ->
                                 new SchemaInfo(
@@ -157,7 +163,8 @@ public class FlussAdmin implements Admin {
                 .setTablePath()
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
-        return gateway.getTableSchema(request)
+        return readOnlyGateway
+                .getTableSchema(request)
                 .thenApply(
                         r ->
                                 new SchemaInfo(
@@ -179,7 +186,8 @@ public class FlussAdmin implements Admin {
     public CompletableFuture<DatabaseInfo> getDatabaseInfo(String 
databaseName) {
         GetDatabaseInfoRequest request = new GetDatabaseInfoRequest();
         request.setDatabaseName(databaseName);
-        return gateway.getDatabaseInfo(request)
+        return readOnlyGateway
+                .getDatabaseInfo(request)
                 .thenApply(
                         r ->
                                 new DatabaseInfo(
@@ -204,13 +212,14 @@ public class FlussAdmin implements Admin {
     public CompletableFuture<Boolean> databaseExists(String databaseName) {
         DatabaseExistsRequest request = new DatabaseExistsRequest();
         request.setDatabaseName(databaseName);
-        return 
gateway.databaseExists(request).thenApply(DatabaseExistsResponse::isExists);
+        return 
readOnlyGateway.databaseExists(request).thenApply(DatabaseExistsResponse::isExists);
     }
 
     @Override
     public CompletableFuture<List<String>> listDatabases() {
         ListDatabasesRequest request = new ListDatabasesRequest();
-        return gateway.listDatabases(request)
+        return readOnlyGateway
+                .listDatabases(request)
                 .thenApply(ListDatabasesResponse::getDatabaseNamesList);
     }
 
@@ -233,7 +242,8 @@ public class FlussAdmin implements Admin {
         request.setTablePath()
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
-        return gateway.getTableInfo(request)
+        return readOnlyGateway
+                .getTableInfo(request)
                 .thenApply(
                         r ->
                                 TableInfo.of(
@@ -261,14 +271,14 @@ public class FlussAdmin implements Admin {
         request.setTablePath()
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
-        return 
gateway.tableExists(request).thenApply(TableExistsResponse::isExists);
+        return 
readOnlyGateway.tableExists(request).thenApply(TableExistsResponse::isExists);
     }
 
     @Override
     public CompletableFuture<List<String>> listTables(String databaseName) {
         ListTablesRequest request = new ListTablesRequest();
         request.setDatabaseName(databaseName);
-        return 
gateway.listTables(request).thenApply(ListTablesResponse::getTableNamesList);
+        return 
readOnlyGateway.listTables(request).thenApply(ListTablesResponse::getTableNamesList);
     }
 
     @Override
@@ -289,7 +299,8 @@ public class FlussAdmin implements Admin {
             PbPartitionSpec pbPartitionSpec = 
makePbPartitionSpec(partitionSpec);
             request.setPartialPartitionSpec(pbPartitionSpec);
         }
-        return gateway.listPartitionInfos(request)
+        return readOnlyGateway
+                .listPartitionInfos(request)
                 .thenApply(ClientRpcMessageUtils::toPartitionInfos);
     }
 
@@ -315,7 +326,8 @@ public class FlussAdmin implements Admin {
         request.setTablePath()
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
-        return gateway.getLatestKvSnapshots(request)
+        return readOnlyGateway
+                .getLatestKvSnapshots(request)
                 .thenApply(ClientRpcMessageUtils::toKvSnapshots);
     }
 
@@ -328,7 +340,8 @@ public class FlussAdmin implements Admin {
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
         request.setPartitionName(partitionName);
-        return gateway.getLatestKvSnapshots(request)
+        return readOnlyGateway
+                .getLatestKvSnapshots(request)
                 .thenApply(ClientRpcMessageUtils::toKvSnapshots);
     }
 
@@ -342,7 +355,8 @@ public class FlussAdmin implements Admin {
         request.setTableId(bucket.getTableId())
                 .setBucketId(bucket.getBucket())
                 .setSnapshotId(snapshotId);
-        return gateway.getKvSnapshotMetadata(request)
+        return readOnlyGateway
+                .getKvSnapshotMetadata(request)
                 .thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
     }
 
@@ -353,7 +367,8 @@ public class FlussAdmin implements Admin {
                 .setDatabaseName(tablePath.getDatabaseName())
                 .setTableName(tablePath.getTableName());
 
-        return gateway.getLatestLakeSnapshot(request)
+        return readOnlyGateway
+                .getLatestLakeSnapshot(request)
                 .thenApply(ClientRpcMessageUtils::toLakeTableSnapshotInfo);
     }
 
diff --git 
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
index 62ad48a7..274dfbac 100644
--- 
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
@@ -130,12 +130,13 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
     void testGetDatabaseInfo() throws Exception {
         long timestampBeforeCreate = System.currentTimeMillis();
         admin.createDatabase(
-                "test_db_2",
-                DatabaseDescriptor.builder()
-                        .comment("test comment")
-                        .customProperty("key1", "value1")
-                        .build(),
-                false);
+                        "test_db_2",
+                        DatabaseDescriptor.builder()
+                                .comment("test comment")
+                                .customProperty("key1", "value1")
+                                .build(),
+                        false)
+                .get();
         DatabaseInfo databaseInfo = admin.getDatabaseInfo("test_db_2").get();
         long timestampAfterCreate = System.currentTimeMillis();
         
assertThat(databaseInfo.getCreatedTime()).isEqualTo(databaseInfo.getModifiedTime());
@@ -181,7 +182,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
         // create and get a new table
         long timestampBeforeCreate = System.currentTimeMillis();
         TablePath tablePath = TablePath.of("test_db", "table_2");
-        admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
+        admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
         tableInfo = admin.getTableInfo(tablePath).get();
         timestampAfterCreate = System.currentTimeMillis();
         
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
@@ -378,16 +379,19 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
 
         // assert the cluster should have tablet server number to be 3
         FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
-        FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();
 
         // we can create the table now
         admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
-        TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
-        assertThat(tableInfo.toTableDescriptor())
-                .isEqualTo(
-                        DEFAULT_TABLE_DESCRIPTOR
-                                .withReplicationFactor(3)
-                                .withDataLakeFormat(DataLakeFormat.PAIMON));
+        // recreate the connection because the metadata of tablet server has 
changed
+        try (Connection conn = ConnectionFactory.createConnection(clientConf);
+                Admin admin = conn.getAdmin()) {
+            TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
+            assertThat(tableInfo.toTableDescriptor())
+                    .isEqualTo(
+                            DEFAULT_TABLE_DESCRIPTOR
+                                    .withReplicationFactor(3)
+                                    
.withDataLakeFormat(DataLakeFormat.PAIMON));
+        }
     }
 
     @Test
diff --git 
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
 
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
index 009c8f6e..f3a87bc3 100644
--- 
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
+++ 
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -33,6 +33,7 @@ import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.row.InternalRow;
 import com.alibaba.fluss.rpc.GatewayClientProxy;
@@ -217,18 +218,16 @@ public class FlussAuthorizationITCase {
     void testAclOperation() throws Exception {
         // Test whether the user has authorization to perform the "list ACLs" 
operation.
         assertThat(guestAdmin.listAcls(AclBindingFilter.ANY).get()).isEmpty();
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        Resource.cluster(),
-                                        new AccessControlEntry(
-                                                guestPrincipal,
-                                                WILD_CARD_HOST,
-                                                OperationType.DESCRIBE,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.cluster(),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        WILD_CARD_HOST,
+                                        OperationType.DESCRIBE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
         assertThat(guestAdmin.listAcls(AclBindingFilter.ANY).get()).hasSize(1);
 
         // test whether the user have authorization to operate create and drop 
acls.
@@ -238,7 +237,7 @@ public class FlussAuthorizationITCase {
                         Resource.table("test_db", "test_table"),
                         new AccessControlEntry(
                                 user1, "*", OperationType.CREATE, 
PermissionType.ALLOW));
-        List<AclBinding> aclBindings =
+        List<AclBinding> noAuthorizationAclBinding =
                 Arrays.asList(
                         user1AclBinding,
                         new AclBinding(
@@ -255,24 +254,22 @@ public class FlussAuthorizationITCase {
                                         "127.0.0.1",
                                         OperationType.DROP,
                                         PermissionType.ALLOW)));
-        assertThatThrownBy(() -> 
guestAdmin.createAcls(aclBindings).all().get())
+        assertThatThrownBy(() -> 
guestAdmin.createAcls(noAuthorizationAclBinding).all().get())
                 .hasMessageContaining(
                         "Principal %s have no authorization to operate ALTER 
on resource",
                         guestPrincipal);
 
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        Resource.cluster(),
-                                        new AccessControlEntry(
-                                                WILD_CARD_PRINCIPAL,
-                                                WILD_CARD_HOST,
-                                                OperationType.ALTER,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
-        guestAdmin.createAcls(aclBindings).all().get();
+        aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.cluster(),
+                                new AccessControlEntry(
+                                        WILD_CARD_PRINCIPAL,
+                                        WILD_CARD_HOST,
+                                        OperationType.ALTER,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        guestAdmin.createAcls(noAuthorizationAclBinding).all().get();
 
         assertThat(
                         guestAdmin
@@ -305,18 +302,16 @@ public class FlussAuthorizationITCase {
                         String.format(
                                 "Principal %s have no authorization to operate 
CREATE on resource Resource{type=CLUSTER, name='fluss-cluster'}",
                                 guestPrincipal));
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        Resource.cluster(),
-                                        new AccessControlEntry(
-                                                guestPrincipal,
-                                                "*",
-                                                OperationType.CREATE,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.cluster(),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.CREATE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
         guestAdmin.createDatabase("test-database2", DatabaseDescriptor.EMPTY, 
false).get();
         assertThat(rootAdmin.databaseExists("test-database1").get()).isFalse();
         assertThat(rootAdmin.databaseExists("test-database2").get()).isTrue();
@@ -330,32 +325,30 @@ public class FlussAuthorizationITCase {
                 .containsExactlyInAnyOrderElementsOf(
                         Lists.newArrayList("fluss", 
DATA1_TABLE_PATH_PK.getDatabaseName()));
 
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        Resource.database("fluss"),
-                                        new AccessControlEntry(
-                                                guestPrincipal,
-                                                "*",
-                                                OperationType.DESCRIBE,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.database("fluss"),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.DESCRIBE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
         
assertThat(guestAdmin.listDatabases().get()).isEqualTo(Collections.singletonList("fluss"));
 
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        Resource.cluster(),
-                                        new AccessControlEntry(
-                                                guestPrincipal,
-                                                "*",
-                                                OperationType.ALL,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
+        aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.cluster(),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.ALL,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
         assertThat(guestAdmin.listDatabases().get())
                 .containsExactlyInAnyOrderElementsOf(
                         Lists.newArrayList("fluss", 
DATA1_TABLE_PATH_PK.getDatabaseName()));
@@ -374,18 +367,17 @@ public class FlussAuthorizationITCase {
                                 "Principal %s have no authorization to operate 
CREATE on resource Resource{type=DATABASE, name='test_db_1'}",
                                 guestPrincipal));
         assertThat(rootAdmin.tableExists(DATA1_TABLE_PATH).get()).isFalse();
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        
Resource.database(DATA1_TABLE_PATH.getDatabaseName()),
-                                        new AccessControlEntry(
-                                                guestPrincipal,
-                                                "*",
-                                                OperationType.CREATE,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
+
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                
Resource.database(DATA1_TABLE_PATH.getDatabaseName()),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.CREATE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
         guestAdmin.createTable(DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR, 
false).get();
         assertThat(rootAdmin.tableExists(DATA1_TABLE_PATH).get()).isTrue();
     }
@@ -394,18 +386,18 @@ public class FlussAuthorizationITCase {
     void testListTables() throws Exception {
         
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
                 .isEqualTo(Collections.emptyList());
-        rootAdmin
-                .createAcls(
-                        Collections.singletonList(
-                                new AclBinding(
-                                        
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
-                                        new AccessControlEntry(
-                                                guestPrincipal,
-                                                "*",
-                                                OperationType.DESCRIBE,
-                                                PermissionType.ALLOW))))
-                .all()
-                .get();
+
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.DESCRIBE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
         
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
                 
.isEqualTo(Collections.singletonList(DATA1_TABLE_PATH_PK.getTableName()));
     }
@@ -428,19 +420,17 @@ public class FlussAuthorizationITCase {
                     .isEmpty();
 
             // if add acl to allow guest read any resource, it will allow to 
get metadata.
-            rootAdmin
-                    .createAcls(
-                            Collections.singletonList(
-                                    new AclBinding(
-                                            
Resource.table(DATA1_TABLE_PATH_PK),
-                                            new AccessControlEntry(
-                                                    guestPrincipal,
-                                                    "*",
-                                                    OperationType.DESCRIBE,
-                                                    PermissionType.ALLOW))))
-                    .all()
-                    .get();
-
+            List<AclBinding> aclBindings =
+                    Collections.singletonList(
+                            new AclBinding(
+                                    Resource.table(DATA1_TABLE_PATH_PK),
+                                    new AccessControlEntry(
+                                            guestPrincipal,
+                                            "*",
+                                            OperationType.DESCRIBE,
+                                            PermissionType.ALLOW)));
+            rootAdmin.createAcls(aclBindings).all().get();
+            FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, 
true);
             
assertThat(guestGateway.metadata(metadataRequest).get().getTableMetadatasList())
                     .hasSize(1);
         }
@@ -454,16 +444,20 @@ public class FlussAuthorizationITCase {
         TableDescriptor descriptor =
                 
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
         rootAdmin.createTable(writeAclTable, descriptor, false).get();
+        TableInfo tableInfo = rootAdmin.getTableInfo(writeAclTable).get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableInfo.getTableId());
         // create acl to allow guest write.
-        AclBinding aclBinding =
-                new AclBinding(
-                        Resource.table(writeAclTable),
-                        new AccessControlEntry(
-                                guestPrincipal, "*", OperationType.WRITE, 
PermissionType.ALLOW));
-        
rootAdmin.createAcls(Collections.singletonList(aclBinding)).all().get();
-
-        FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
-                rootAdmin.getTableInfo(writeAclTable).get().getTableId(), 
aclBinding);
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.table(writeAclTable),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.WRITE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
 
         FlussConnection flussConnection = (FlussConnection) guestConn;
         TabletServerGateway tabletServerGateway =
@@ -512,26 +506,31 @@ public class FlussAuthorizationITCase {
                 
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
         rootAdmin.createTable(writeAclTable, descriptor, false).get();
         rootAdmin.createTable(noWriteAclTable, descriptor, false).get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+                rootAdmin.getTableInfo(writeAclTable).get().getTableId());
+        FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+                rootAdmin.getTableInfo(noWriteAclTable).get().getTableId());
 
         // create acl to allow guest write for writeAclTable.
-        AclBinding aclBindingOfWriteAclTable =
-                new AclBinding(
-                        Resource.table(writeAclTable),
-                        new AccessControlEntry(
-                                guestPrincipal, "*", OperationType.WRITE, 
PermissionType.ALLOW));
-        AclBinding aclBindingOfNoWriteAclTable =
-                new AclBinding(
-                        Resource.table(noWriteAclTable),
-                        new AccessControlEntry(guestPrincipal, "*", READ, 
PermissionType.ALLOW));
-        
rootAdmin.createAcls(Collections.singletonList(aclBindingOfWriteAclTable)).all().get();
-        
rootAdmin.createAcls(Collections.singletonList(aclBindingOfNoWriteAclTable)).all().get();
-
-        FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
-                rootAdmin.getTableInfo(writeAclTable).get().getTableId(),
-                aclBindingOfWriteAclTable);
-        FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
-                rootAdmin.getTableInfo(noWriteAclTable).get().getTableId(),
-                aclBindingOfNoWriteAclTable);
+        List<AclBinding> aclBindingOfWriteAclTables =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.table(writeAclTable),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.WRITE,
+                                        PermissionType.ALLOW)));
+        List<AclBinding> aclBindingOfNoWriteAclTables =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.table(noWriteAclTable),
+                                new AccessControlEntry(
+                                        guestPrincipal, "*", READ, 
PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindingOfWriteAclTables).all().get();
+        
FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindingOfWriteAclTables, 
true);
+        rootAdmin.createAcls(aclBindingOfNoWriteAclTables).all().get();
+        
FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindingOfNoWriteAclTables,
 true);
 
         // 1. Try to write data to noWriteAclTable. It should throw 
AuthorizationException because
         // of request writeId failed.
@@ -577,15 +576,20 @@ public class FlussAuthorizationITCase {
         TableDescriptor descriptor =
                 
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
         rootAdmin.createTable(DATA1_TABLE_PATH, descriptor, false).get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(
+                rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId());
         // create acl to allow guest write.
-        AclBinding aclBinding =
-                new AclBinding(
-                        Resource.table(DATA1_TABLE_PATH),
-                        new AccessControlEntry(
-                                guestPrincipal, "*", OperationType.WRITE, 
PermissionType.ALLOW));
-        
rootAdmin.createAcls(Collections.singletonList(aclBinding)).all().get();
-        FLUSS_CLUSTER_EXTENSION.waitUtilTableReadyWithAuthorization(
-                rootAdmin.getTableInfo(DATA1_TABLE_PATH).get().getTableId(), 
aclBinding);
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                Resource.table(DATA1_TABLE_PATH),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.WRITE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(aclBindings, true);
         try (Table table = guestConn.getTable(DATA1_TABLE_PATH)) {
             AppendWriter appendWriter = table.newAppend().createWriter();
             appendWriter.append(row(1, "a")).get();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 3d0e5bec..24497100 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -24,8 +24,11 @@ import com.alibaba.fluss.exception.AuthorizationException;
 import com.alibaba.fluss.flink.catalog.FlinkCatalogOptions;
 import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.security.acl.AccessControlEntry;
+import com.alibaba.fluss.security.acl.AclBinding;
 import com.alibaba.fluss.security.acl.FlussPrincipal;
 import com.alibaba.fluss.security.acl.OperationType;
+import com.alibaba.fluss.security.acl.PermissionType;
 import com.alibaba.fluss.security.acl.Resource;
 import com.alibaba.fluss.security.auth.sasl.jaas.LoginManager;
 import com.alibaba.fluss.server.testutils.FlussClusterExtension;
@@ -421,6 +424,13 @@ abstract class FlinkAuthorizationITCase extends 
AbstractTestBase {
                                 String.format("%s:%s", guest.getType(), 
guest.getName()),
                                 operationType.name()))
                 .await();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(
+                Collections.singletonList(
+                        new AclBinding(
+                                resource,
+                                new AccessControlEntry(
+                                        guest, "*", operationType, 
PermissionType.ALLOW))),
+                true);
     }
 
     void dropAcl(Resource resource, OperationType operationType)
@@ -433,6 +443,13 @@ abstract class FlinkAuthorizationITCase extends 
AbstractTestBase {
                                 String.format("%s:%s", guest.getType(), 
guest.getName()),
                                 operationType.name()))
                 .await();
+        FLUSS_CLUSTER_EXTENSION.waitUtilAuthenticationSync(
+                Collections.singletonList(
+                        new AclBinding(
+                                resource,
+                                new AccessControlEntry(
+                                        guest, "*", operationType, 
PermissionType.ALLOW))),
+                false);
     }
 
     private static Configuration initConfig() {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
index d2f6fc4d..9e18be14 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
@@ -17,6 +17,7 @@
 
 package com.alibaba.fluss.server;
 
+import com.alibaba.fluss.annotation.VisibleForTesting;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.FlussException;
@@ -24,6 +25,7 @@ import com.alibaba.fluss.fs.FileSystem;
 import com.alibaba.fluss.fs.FsPath;
 import com.alibaba.fluss.plugin.PluginManager;
 import com.alibaba.fluss.plugin.PluginUtils;
+import com.alibaba.fluss.server.authorizer.Authorizer;
 import com.alibaba.fluss.server.coordinator.CoordinatorServer;
 import com.alibaba.fluss.server.exception.FlussParseException;
 import com.alibaba.fluss.server.tablet.TabletServer;
@@ -37,6 +39,8 @@ import com.alibaba.fluss.utils.concurrent.FutureUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.UndeclaredThrowableException;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
@@ -177,6 +181,9 @@ public abstract class ServerBase implements 
AutoCloseableAsync, FatalErrorHandle
 
     protected abstract String getServerName();
 
+    @VisibleForTesting
+    public abstract @Nullable Authorizer getAuthorizer();
+
     /** Result for run {@link ServerBase}. */
     public enum Result {
         SUCCESS(SUCCESS_EXIT_CODE),
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
index e54fc9ee..e8388f10 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
@@ -492,6 +492,11 @@ public class CoordinatorServer extends ServerBase {
         return metadataCache;
     }
 
+    @VisibleForTesting
+    public @Nullable Authorizer getAuthorizer() {
+        return authorizer;
+    }
+
     private static void validateConfigs(Configuration conf) {
         if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
             throw new IllegalConfigurationException(
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
index 87a7f38e..0d92bb0a 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java
@@ -41,6 +41,7 @@ import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
 import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
 import com.alibaba.fluss.security.acl.AccessControlEntry;
 import com.alibaba.fluss.security.acl.AclBinding;
+import com.alibaba.fluss.server.ServerBase;
 import com.alibaba.fluss.server.authorizer.Authorizer;
 import com.alibaba.fluss.server.authorizer.DefaultAuthorizer;
 import com.alibaba.fluss.server.coordinator.CoordinatorServer;
@@ -81,6 +82,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -519,13 +521,6 @@ public final class FlussClusterExtension
 
     /** Wait until all the table assignments buckets are ready for table. */
     public void waitUtilTableReady(long tableId) {
-        waitUtilTableReadyWithAuthorization(tableId, null);
-    }
-
-    /**
-     * Wait until all the table assignments buckets and required authorization 
are ready for table.
-     */
-    public void waitUtilTableReadyWithAuthorization(long tableId, @Nullable 
AclBinding aclBinding) {
         ZooKeeperClient zkClient = getZooKeeperClient();
         retry(
                 Duration.ofMinutes(1),
@@ -534,29 +529,40 @@ public final class FlussClusterExtension
                             zkClient.getTableAssignment(tableId);
                     assertThat(tableAssignmentOpt).isPresent();
                     waitReplicaInAssignmentReady(zkClient, 
tableAssignmentOpt.get(), tableId, null);
+                });
+    }
 
-                    if (aclBinding != null) {
-                        getTabletServers()
-                                .forEach(
-                                        ts -> {
-                                            Authorizer authorizer = 
ts.getAuthorizer();
-                                            assertThat(authorizer).isNotNull();
-                                            AccessControlEntry 
accessControlEntry =
-                                                    
aclBinding.getAccessControlEntry();
-                                            assertThat(
-                                                            
((DefaultAuthorizer) authorizer)
-                                                                    
.aclsAllowAccess(
-                                                                            
aclBinding
-                                                                               
     .getResource(),
-                                                                            
accessControlEntry
-                                                                               
     .getPrincipal(),
-                                                                            
accessControlEntry
-                                                                               
     .getOperationType(),
-                                                                            
accessControlEntry
-                                                                               
     .getHost()))
-                                                    .isTrue();
-                                        });
-                    }
+    /**
+     * Wait until all authorization are synchronized to all tablet servers.
+     *
+     * @param aclBindings aclBindings to be synchronized.
+     * @param exist whether aclBinding exist.
+     */
+    public void waitUtilAuthenticationSync(Collection<AclBinding> aclBindings, 
boolean exist) {
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    Set<ServerBase> servers = new 
HashSet<>(getTabletServers());
+                    servers.add(getCoordinatorServer());
+                    servers.forEach(
+                            ts -> {
+                                Authorizer authorizer = ts.getAuthorizer();
+                                assertThat(authorizer).isNotNull();
+                                for (AclBinding aclBinding : aclBindings) {
+                                    AccessControlEntry accessControlEntry =
+                                            aclBinding.getAccessControlEntry();
+                                    assertThat(
+                                                    ((DefaultAuthorizer) 
authorizer)
+                                                            .aclsAllowAccess(
+                                                                    
aclBinding.getResource(),
+                                                                    
accessControlEntry
+                                                                            
.getPrincipal(),
+                                                                    
accessControlEntry
+                                                                            
.getOperationType(),
+                                                                    
accessControlEntry.getHost()))
+                                            .isEqualTo(exist);
+                                }
+                            });
                 });
     }
 


Reply via email to