caigy commented on code in PR #4769:
URL: https://github.com/apache/rocketmq/pull/4769#discussion_r990716859


##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {

Review Comment:
   Add `@Overrride`



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check 
your parameter");
+            throw new AclException("Parameter value plainAccessConfig is null, 
Please check your parameter");
+        }
+        checkPlainAccessConfig(plainAccessConfig);
+
+        //Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
+        //Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
+        if (accessKeyTable.containsKey(plainAccessConfig.getAccessKey())) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = 
accessKeyTable.get(plainAccessConfig.getAccessKey());
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey()))
 {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = createAclAccessConfigMap(account, 
plainAccessConfig);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = createAclAccessConfigMap(null, 
plainAccessConfig);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if 
(entry.getValue().getAccessKey().equals(plainAccessConfig.getAccessKey())) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            String fileName = MixAll.dealFilePath(defaultAclFile);
+            //Create acl access config elements on the default acl file
+            if (aclPlainAccessResourceMap.get(defaultAclFile) == null || 
aclPlainAccessResourceMap.get(defaultAclFile).size() == 0) {
+                try {
+                    File defaultAclFile = new File(fileName);
+                    if (!defaultAclFile.exists()) {
+                        defaultAclFile.createNewFile();
+                    }
+                } catch (IOException e) {
+                    log.warn("create default acl file has exception when 
update accessConfig. ", e);
+                }
+            }
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(defaultAclFile, Map.class);
+            if (aclAccessConfigMap == null) {
+                aclAccessConfigMap = new HashMap<>();
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, new 
ArrayList<>());
+            }
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            // When no accounts defined
+            if (null == accounts) {
+                accounts = new ArrayList<>();
+            }
+            accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+            aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            accessKeyTable.put(plainAccessConfig.getAccessKey(), fileName);
+            if (aclPlainAccessResourceMap.get(fileName) == null) {
+                Map<String, PlainAccessResource> plainAccessResourceMap = new 
HashMap<>(1);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            } else {
+                Map<String, PlainAccessResource> plainAccessResourceMap = 
aclPlainAccessResourceMap.get(fileName);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            }
+            return AclUtils.writeDataObject(defaultAclFile, 
updateAclConfigFileVersion(defaultAclFile, aclAccessConfigMap));
+        }
+    }
+
+    public PlainAccessResource updatePlainAccessResource(Map<String, Object> 
updateAccountMap) {
+        PlainAccessResource updatePlainAccessResource = new 
PlainAccessResource();
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ACCESS_KEY)) {
+            
updatePlainAccessResource.setAccessKey(updateAccountMap.get(AclConstants.CONFIG_ACCESS_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_SECRET_KEY)) {
+            
updatePlainAccessResource.setSecretKey(updateAccountMap.get(AclConstants.CONFIG_SECRET_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_WHITE_ADDR)) {
+            
updatePlainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
+                
getRemoteAddressStrategy(updateAccountMap.get(AclConstants.CONFIG_WHITE_ADDR).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ADMIN_ROLE)) {
+            
updatePlainAccessResource.setAdmin(Boolean.parseBoolean(updateAccountMap.get(AclConstants.CONFIG_ADMIN_ROLE).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_TOPIC_PERM)) {
+            updatePlainAccessResource.setDefaultTopicPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_GROUP_PERM)) {
+            updatePlainAccessResource.setDefaultGroupPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_RESOURCE_PERMS)) {
+            List<LinkedHashMap<String, Object>> resourceAndPermList = 
(List<LinkedHashMap<String, 
Object>>)updateAccountMap.get(AclConstants.CONFIG_RESOURCE_PERMS);
+            for (LinkedHashMap resourceAndPerm : resourceAndPermList) {
+                String resource = resourceAndPerm.get("resource").toString();
+                String namespace = resourceAndPerm.get("namespace").toString();
+                String type = resourceAndPerm.get("type").toString();
+                String perm = resourceAndPerm.get("perm").toString();
+                if (namespace == null || namespace.isEmpty()) {
+                    if (type.equals(ResourceType.GROUP.toString())) {

Review Comment:
   Use `java.lang.Enum.name()` instead of `toString()`



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check 
your parameter");
+            throw new AclException("Parameter value plainAccessConfig is null, 
Please check your parameter");
+        }
+        checkPlainAccessConfig(plainAccessConfig);
+
+        //Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
+        //Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
+        if (accessKeyTable.containsKey(plainAccessConfig.getAccessKey())) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = 
accessKeyTable.get(plainAccessConfig.getAccessKey());
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey()))
 {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = createAclAccessConfigMap(account, 
plainAccessConfig);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = createAclAccessConfigMap(null, 
plainAccessConfig);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);

Review Comment:
   Use `Maps.newHashMapWithExpectedSize()` to prevent resizing.



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -45,13 +45,19 @@
 import org.apache.rocketmq.common.AclConfig;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.NamespaceAndPerm;
+import org.apache.rocketmq.common.OperationType;
 import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.ResourceAndPerm;
+import org.apache.rocketmq.common.ResourceType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.srvutil.AclFileWatchService;
 
+import static 
org.apache.rocketmq.common.protocol.NamespaceUtil.NAMESPACE_SEPARATOR;
+
 public class PlainPermissionManager {

Review Comment:
   Methods changed in this class are a bit complicated, you'd better do some 
refactoring work, like extracting common parts, reducing indentation, etc.



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check 
your parameter");
+            throw new AclException("Parameter value plainAccessConfig is null, 
Please check your parameter");
+        }
+        checkPlainAccessConfig(plainAccessConfig);
+
+        //Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
+        //Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
+        if (accessKeyTable.containsKey(plainAccessConfig.getAccessKey())) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = 
accessKeyTable.get(plainAccessConfig.getAccessKey());
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey()))
 {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = createAclAccessConfigMap(account, 
plainAccessConfig);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = createAclAccessConfigMap(null, 
plainAccessConfig);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if 
(entry.getValue().getAccessKey().equals(plainAccessConfig.getAccessKey())) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            String fileName = MixAll.dealFilePath(defaultAclFile);
+            //Create acl access config elements on the default acl file
+            if (aclPlainAccessResourceMap.get(defaultAclFile) == null || 
aclPlainAccessResourceMap.get(defaultAclFile).size() == 0) {
+                try {
+                    File defaultAclFile = new File(fileName);
+                    if (!defaultAclFile.exists()) {
+                        defaultAclFile.createNewFile();
+                    }
+                } catch (IOException e) {
+                    log.warn("create default acl file has exception when 
update accessConfig. ", e);
+                }
+            }
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(defaultAclFile, Map.class);
+            if (aclAccessConfigMap == null) {
+                aclAccessConfigMap = new HashMap<>();
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, new 
ArrayList<>());
+            }
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            // When no accounts defined
+            if (null == accounts) {
+                accounts = new ArrayList<>();
+            }
+            accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+            aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            accessKeyTable.put(plainAccessConfig.getAccessKey(), fileName);
+            if (aclPlainAccessResourceMap.get(fileName) == null) {
+                Map<String, PlainAccessResource> plainAccessResourceMap = new 
HashMap<>(1);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            } else {
+                Map<String, PlainAccessResource> plainAccessResourceMap = 
aclPlainAccessResourceMap.get(fileName);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            }
+            return AclUtils.writeDataObject(defaultAclFile, 
updateAclConfigFileVersion(defaultAclFile, aclAccessConfigMap));
+        }
+    }
+
+    public PlainAccessResource updatePlainAccessResource(Map<String, Object> 
updateAccountMap) {
+        PlainAccessResource updatePlainAccessResource = new 
PlainAccessResource();
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ACCESS_KEY)) {
+            
updatePlainAccessResource.setAccessKey(updateAccountMap.get(AclConstants.CONFIG_ACCESS_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_SECRET_KEY)) {
+            
updatePlainAccessResource.setSecretKey(updateAccountMap.get(AclConstants.CONFIG_SECRET_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_WHITE_ADDR)) {
+            
updatePlainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
+                
getRemoteAddressStrategy(updateAccountMap.get(AclConstants.CONFIG_WHITE_ADDR).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ADMIN_ROLE)) {
+            
updatePlainAccessResource.setAdmin(Boolean.parseBoolean(updateAccountMap.get(AclConstants.CONFIG_ADMIN_ROLE).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_TOPIC_PERM)) {
+            updatePlainAccessResource.setDefaultTopicPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_GROUP_PERM)) {
+            updatePlainAccessResource.setDefaultGroupPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_RESOURCE_PERMS)) {
+            List<LinkedHashMap<String, Object>> resourceAndPermList = 
(List<LinkedHashMap<String, 
Object>>)updateAccountMap.get(AclConstants.CONFIG_RESOURCE_PERMS);
+            for (LinkedHashMap resourceAndPerm : resourceAndPermList) {
+                String resource = resourceAndPerm.get("resource").toString();
+                String namespace = resourceAndPerm.get("namespace").toString();
+                String type = resourceAndPerm.get("type").toString();
+                String perm = resourceAndPerm.get("perm").toString();
+                if (namespace == null || namespace.isEmpty()) {
+                    if (type.equals(ResourceType.GROUP.toString())) {
+                        
updatePlainAccessResource.addResourceAndPerm(MixAll.getRetryTopic(resource), 
Permission.parsePermFromString(perm));
+                    } else {
+                        updatePlainAccessResource.addResourceAndPerm(resource, 
Permission.parsePermFromString(perm));
+                    }
+                } else {
+                    if (type.equals(ResourceType.GROUP.toString())) {
+                        updatePlainAccessResource.addResourceAndPerm(
+                            MixAll.RETRY_GROUP_TOPIC_PREFIX + namespace + 
NAMESPACE_SEPARATOR + resource, Permission.parsePermFromString(perm));
+                    } else {
+                        updatePlainAccessResource.addResourceAndPerm(namespace 
+ resource, Permission.parsePermFromString(perm));
+                    }
+                }
+
+            }
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_NAMESPACE_PERMS)) 
{
+            List<LinkedHashMap<String, String>> namespaceAndPermList = 
(List<LinkedHashMap<String, 
String>>)updateAccountMap.get(AclConstants.CONFIG_NAMESPACE_PERMS);
+            Map<String, Map<String, Byte>> namespacePermMap = new HashMap<>();
+            for (LinkedHashMap<String, String> namespaceAndPerm : 
namespaceAndPermList) {
+                String namespace = namespaceAndPerm.get("namespace");
+                String topicPerm = namespaceAndPerm.get("topicPerm");
+                String groupPerm = namespaceAndPerm.get("groupPerm");
+                Map<String, Byte> permMap = new HashMap<>(2);
+                if (topicPerm != null && !topicPerm.isEmpty()) {
+                    permMap.put(AclConstants.CONFIG_TOPIC_PERM, 
Permission.parsePermFromString(topicPerm));
+                }
+                if (groupPerm != null && !groupPerm.isEmpty()) {
+                    permMap.put(AclConstants.CONFIG_GROUP_PERM, 
Permission.parsePermFromString(groupPerm));
+                }
+                namespacePermMap.put(namespace, permMap);
+            }
+            updatePlainAccessResource.setNamespacePermMap(namespacePermMap);
+        }
+        return updatePlainAccessResource;
+    }
+
+    public boolean updateAclResourcePerms(String accesskey, ResourceAndPerm 
resourceAndPerm, OperationType operationType) {
+        if (accesskey == null || resourceAndPerm == null || operationType == 
null) {
+            log.error("Parameter: accesskey or resourceAndPerm or 
operationType is null, Please check your parameter");
+            throw new AclException("The parameter of updateAclResourcePerms 
command: accesskey or resourceAndPerm or operationType is null, Please check 
your parameter");
+        }
+        if (operationType == OperationType.ADD || operationType == 
OperationType.UPDATE) {
+            checkResourceAndPerm(resourceAndPerm);
+        }
+
+        if (accessKeyTable.containsKey(accesskey)) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = accessKeyTable.get(accesskey);
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = updateAclAccessConfigMap(account, 
resourceAndPerm, operationType);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = updateAclAccessConfigMap(null, 
resourceAndPerm, operationType);
+                updateAccountMap.put(AclConstants.CONFIG_ACCESS_KEY, 
accesskey);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(accesskey, updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(accesskey, updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if (entry.getValue().getAccessKey().equals(accesskey)) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            throw new AclException("The accesskey for update acl resource 
perms is not exist in the acl config.");
+        }
+
+    }
+
+    public Map<String, Object> updateAclAccessConfigMap(Map<String, Object> 
existedAccountMap,
+        ResourceAndPerm resourceAndPerm, OperationType operationType) {
+        Map<String, Object> newAccountsMap = null;
+        if (existedAccountMap == null) {
+            newAccountsMap = new LinkedHashMap<String, Object>();
+        } else {
+            newAccountsMap = existedAccountMap;
+        }
+
+        List<LinkedHashMap> resourceAndPerms = 
(List<LinkedHashMap>)newAccountsMap.get(AclConstants.CONFIG_RESOURCE_PERMS);
+        if (resourceAndPerms == null) {
+            resourceAndPerms = new ArrayList<>();
+        }
+
+        if (operationType == OperationType.ADD) {
+            LinkedHashMap<String, Object> addResourceAndPerm = new 
LinkedHashMap<String, Object>(4);
+            addResourceAndPerm.put("resource", resourceAndPerm.getResource());
+            addResourceAndPerm.put("type", resourceAndPerm.getType());
+            addResourceAndPerm.put("namespace", 
resourceAndPerm.getNamespace());
+            addResourceAndPerm.put("perm", resourceAndPerm.getPerm());
+            resourceAndPerms.add(addResourceAndPerm);
+        } else {
+            Iterator iterator = resourceAndPerms.iterator();
+            while (iterator.hasNext()) {
+                LinkedHashMap<String, Object> curResourceAndPerm = 
(LinkedHashMap)iterator.next();
+                if 
(curResourceAndPerm.get("resource").toString().equals(resourceAndPerm.getResource())
 && 
curResourceAndPerm.get("type").toString().equals(resourceAndPerm.getType().toString()))
 {

Review Comment:
   Is `toString()` necessary here?



##########
common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java:
##########
@@ -110,6 +132,8 @@ public String toString() {
             ", defaultGroupPerm='" + defaultGroupPerm + '\'' +
             ", topicPerms=" + topicPerms +
             ", groupPerms=" + groupPerms +
+            ", resourcePerms=" + resourcePerms +
+            ", namespacePerms=" + namespacePerms +

Review Comment:
   Implement `toString()` of `ResourceAndPerm` and `NamespaceAndPerm`.



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check 
your parameter");
+            throw new AclException("Parameter value plainAccessConfig is null, 
Please check your parameter");
+        }
+        checkPlainAccessConfig(plainAccessConfig);
+
+        //Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
+        //Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
+        if (accessKeyTable.containsKey(plainAccessConfig.getAccessKey())) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = 
accessKeyTable.get(plainAccessConfig.getAccessKey());
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey()))
 {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = createAclAccessConfigMap(account, 
plainAccessConfig);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = createAclAccessConfigMap(null, 
plainAccessConfig);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if 
(entry.getValue().getAccessKey().equals(plainAccessConfig.getAccessKey())) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            String fileName = MixAll.dealFilePath(defaultAclFile);
+            //Create acl access config elements on the default acl file
+            if (aclPlainAccessResourceMap.get(defaultAclFile) == null || 
aclPlainAccessResourceMap.get(defaultAclFile).size() == 0) {
+                try {
+                    File defaultAclFile = new File(fileName);
+                    if (!defaultAclFile.exists()) {
+                        defaultAclFile.createNewFile();
+                    }
+                } catch (IOException e) {
+                    log.warn("create default acl file has exception when 
update accessConfig. ", e);
+                }
+            }
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(defaultAclFile, Map.class);
+            if (aclAccessConfigMap == null) {
+                aclAccessConfigMap = new HashMap<>();
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, new 
ArrayList<>());
+            }
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            // When no accounts defined
+            if (null == accounts) {
+                accounts = new ArrayList<>();
+            }
+            accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+            aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            accessKeyTable.put(plainAccessConfig.getAccessKey(), fileName);
+            if (aclPlainAccessResourceMap.get(fileName) == null) {
+                Map<String, PlainAccessResource> plainAccessResourceMap = new 
HashMap<>(1);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            } else {
+                Map<String, PlainAccessResource> plainAccessResourceMap = 
aclPlainAccessResourceMap.get(fileName);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            }
+            return AclUtils.writeDataObject(defaultAclFile, 
updateAclConfigFileVersion(defaultAclFile, aclAccessConfigMap));
+        }
+    }
+
+    public PlainAccessResource updatePlainAccessResource(Map<String, Object> 
updateAccountMap) {

Review Comment:
   In this method, `PlainAccessResource` is **created** but not **updated.** 
The signature of methods should be more descriptive.
   How about modifying 
`org.apache.rocketmq.acl.plain.PlainPermissionManager#buildPlainAccessResource` 
to support new config format?



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check 
your parameter");
+            throw new AclException("Parameter value plainAccessConfig is null, 
Please check your parameter");
+        }
+        checkPlainAccessConfig(plainAccessConfig);
+
+        //Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
+        //Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
+        if (accessKeyTable.containsKey(plainAccessConfig.getAccessKey())) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = 
accessKeyTable.get(plainAccessConfig.getAccessKey());
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey()))
 {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = createAclAccessConfigMap(account, 
plainAccessConfig);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = createAclAccessConfigMap(null, 
plainAccessConfig);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if 
(entry.getValue().getAccessKey().equals(plainAccessConfig.getAccessKey())) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            String fileName = MixAll.dealFilePath(defaultAclFile);
+            //Create acl access config elements on the default acl file
+            if (aclPlainAccessResourceMap.get(defaultAclFile) == null || 
aclPlainAccessResourceMap.get(defaultAclFile).size() == 0) {
+                try {
+                    File defaultAclFile = new File(fileName);
+                    if (!defaultAclFile.exists()) {
+                        defaultAclFile.createNewFile();
+                    }
+                } catch (IOException e) {
+                    log.warn("create default acl file has exception when 
update accessConfig. ", e);
+                }
+            }
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(defaultAclFile, Map.class);
+            if (aclAccessConfigMap == null) {
+                aclAccessConfigMap = new HashMap<>();
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, new 
ArrayList<>());
+            }
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            // When no accounts defined
+            if (null == accounts) {
+                accounts = new ArrayList<>();
+            }
+            accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+            aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            accessKeyTable.put(plainAccessConfig.getAccessKey(), fileName);
+            if (aclPlainAccessResourceMap.get(fileName) == null) {
+                Map<String, PlainAccessResource> plainAccessResourceMap = new 
HashMap<>(1);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            } else {
+                Map<String, PlainAccessResource> plainAccessResourceMap = 
aclPlainAccessResourceMap.get(fileName);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            }
+            return AclUtils.writeDataObject(defaultAclFile, 
updateAclConfigFileVersion(defaultAclFile, aclAccessConfigMap));
+        }
+    }
+
+    public PlainAccessResource updatePlainAccessResource(Map<String, Object> 
updateAccountMap) {
+        PlainAccessResource updatePlainAccessResource = new 
PlainAccessResource();
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ACCESS_KEY)) {
+            
updatePlainAccessResource.setAccessKey(updateAccountMap.get(AclConstants.CONFIG_ACCESS_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_SECRET_KEY)) {
+            
updatePlainAccessResource.setSecretKey(updateAccountMap.get(AclConstants.CONFIG_SECRET_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_WHITE_ADDR)) {
+            
updatePlainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
+                
getRemoteAddressStrategy(updateAccountMap.get(AclConstants.CONFIG_WHITE_ADDR).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ADMIN_ROLE)) {
+            
updatePlainAccessResource.setAdmin(Boolean.parseBoolean(updateAccountMap.get(AclConstants.CONFIG_ADMIN_ROLE).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_TOPIC_PERM)) {
+            updatePlainAccessResource.setDefaultTopicPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_GROUP_PERM)) {
+            updatePlainAccessResource.setDefaultGroupPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_RESOURCE_PERMS)) {
+            List<LinkedHashMap<String, Object>> resourceAndPermList = 
(List<LinkedHashMap<String, 
Object>>)updateAccountMap.get(AclConstants.CONFIG_RESOURCE_PERMS);
+            for (LinkedHashMap resourceAndPerm : resourceAndPermList) {
+                String resource = resourceAndPerm.get("resource").toString();
+                String namespace = resourceAndPerm.get("namespace").toString();
+                String type = resourceAndPerm.get("type").toString();
+                String perm = resourceAndPerm.get("perm").toString();
+                if (namespace == null || namespace.isEmpty()) {
+                    if (type.equals(ResourceType.GROUP.toString())) {
+                        
updatePlainAccessResource.addResourceAndPerm(MixAll.getRetryTopic(resource), 
Permission.parsePermFromString(perm));
+                    } else {
+                        updatePlainAccessResource.addResourceAndPerm(resource, 
Permission.parsePermFromString(perm));
+                    }
+                } else {
+                    if (type.equals(ResourceType.GROUP.toString())) {
+                        updatePlainAccessResource.addResourceAndPerm(
+                            MixAll.RETRY_GROUP_TOPIC_PREFIX + namespace + 
NAMESPACE_SEPARATOR + resource, Permission.parsePermFromString(perm));

Review Comment:
   Make use of methods provided in `NamespaceUtil` to construct the resource. 
Do not import the concrete implementation of namespace formate in ACL module.



##########
acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java:
##########
@@ -393,6 +399,401 @@ public boolean updateAccessConfig(PlainAccessConfig 
plainAccessConfig) {
         }
     }
 
+    public boolean updateAclAccount(PlainAccessConfig plainAccessConfig) {
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check 
your parameter");
+            throw new AclException("Parameter value plainAccessConfig is null, 
Please check your parameter");
+        }
+        checkPlainAccessConfig(plainAccessConfig);
+
+        //Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
+        //Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
+        if (accessKeyTable.containsKey(plainAccessConfig.getAccessKey())) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = 
accessKeyTable.get(plainAccessConfig.getAccessKey());
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey()))
 {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = createAclAccessConfigMap(account, 
plainAccessConfig);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = createAclAccessConfigMap(null, 
plainAccessConfig);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(plainAccessConfig.getAccessKey(), 
updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if 
(entry.getValue().getAccessKey().equals(plainAccessConfig.getAccessKey())) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            String fileName = MixAll.dealFilePath(defaultAclFile);
+            //Create acl access config elements on the default acl file
+            if (aclPlainAccessResourceMap.get(defaultAclFile) == null || 
aclPlainAccessResourceMap.get(defaultAclFile).size() == 0) {
+                try {
+                    File defaultAclFile = new File(fileName);
+                    if (!defaultAclFile.exists()) {
+                        defaultAclFile.createNewFile();
+                    }
+                } catch (IOException e) {
+                    log.warn("create default acl file has exception when 
update accessConfig. ", e);
+                }
+            }
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(defaultAclFile, Map.class);
+            if (aclAccessConfigMap == null) {
+                aclAccessConfigMap = new HashMap<>();
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, new 
ArrayList<>());
+            }
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            // When no accounts defined
+            if (null == accounts) {
+                accounts = new ArrayList<>();
+            }
+            accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+            aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            accessKeyTable.put(plainAccessConfig.getAccessKey(), fileName);
+            if (aclPlainAccessResourceMap.get(fileName) == null) {
+                Map<String, PlainAccessResource> plainAccessResourceMap = new 
HashMap<>(1);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            } else {
+                Map<String, PlainAccessResource> plainAccessResourceMap = 
aclPlainAccessResourceMap.get(fileName);
+                plainAccessResourceMap.put(plainAccessConfig.getAccessKey(), 
buildPlainAccessResource(plainAccessConfig));
+                aclPlainAccessResourceMap.put(fileName, 
plainAccessResourceMap);
+            }
+            return AclUtils.writeDataObject(defaultAclFile, 
updateAclConfigFileVersion(defaultAclFile, aclAccessConfigMap));
+        }
+    }
+
+    public PlainAccessResource updatePlainAccessResource(Map<String, Object> 
updateAccountMap) {
+        PlainAccessResource updatePlainAccessResource = new 
PlainAccessResource();
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ACCESS_KEY)) {
+            
updatePlainAccessResource.setAccessKey(updateAccountMap.get(AclConstants.CONFIG_ACCESS_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_SECRET_KEY)) {
+            
updatePlainAccessResource.setSecretKey(updateAccountMap.get(AclConstants.CONFIG_SECRET_KEY).toString());
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_WHITE_ADDR)) {
+            
updatePlainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
+                
getRemoteAddressStrategy(updateAccountMap.get(AclConstants.CONFIG_WHITE_ADDR).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_ADMIN_ROLE)) {
+            
updatePlainAccessResource.setAdmin(Boolean.parseBoolean(updateAccountMap.get(AclConstants.CONFIG_ADMIN_ROLE).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_TOPIC_PERM)) {
+            updatePlainAccessResource.setDefaultTopicPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM).toString()));
+        }
+        if 
(updateAccountMap.containsKey(AclConstants.CONFIG_DEFAULT_GROUP_PERM)) {
+            updatePlainAccessResource.setDefaultGroupPerm(Permission.
+                
parsePermFromString(updateAccountMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM).toString()));
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_RESOURCE_PERMS)) {
+            List<LinkedHashMap<String, Object>> resourceAndPermList = 
(List<LinkedHashMap<String, 
Object>>)updateAccountMap.get(AclConstants.CONFIG_RESOURCE_PERMS);
+            for (LinkedHashMap resourceAndPerm : resourceAndPermList) {
+                String resource = resourceAndPerm.get("resource").toString();
+                String namespace = resourceAndPerm.get("namespace").toString();
+                String type = resourceAndPerm.get("type").toString();
+                String perm = resourceAndPerm.get("perm").toString();
+                if (namespace == null || namespace.isEmpty()) {
+                    if (type.equals(ResourceType.GROUP.toString())) {
+                        
updatePlainAccessResource.addResourceAndPerm(MixAll.getRetryTopic(resource), 
Permission.parsePermFromString(perm));
+                    } else {
+                        updatePlainAccessResource.addResourceAndPerm(resource, 
Permission.parsePermFromString(perm));
+                    }
+                } else {
+                    if (type.equals(ResourceType.GROUP.toString())) {
+                        updatePlainAccessResource.addResourceAndPerm(
+                            MixAll.RETRY_GROUP_TOPIC_PREFIX + namespace + 
NAMESPACE_SEPARATOR + resource, Permission.parsePermFromString(perm));
+                    } else {
+                        updatePlainAccessResource.addResourceAndPerm(namespace 
+ resource, Permission.parsePermFromString(perm));
+                    }
+                }
+
+            }
+        }
+        if (updateAccountMap.containsKey(AclConstants.CONFIG_NAMESPACE_PERMS)) 
{
+            List<LinkedHashMap<String, String>> namespaceAndPermList = 
(List<LinkedHashMap<String, 
String>>)updateAccountMap.get(AclConstants.CONFIG_NAMESPACE_PERMS);
+            Map<String, Map<String, Byte>> namespacePermMap = new HashMap<>();
+            for (LinkedHashMap<String, String> namespaceAndPerm : 
namespaceAndPermList) {
+                String namespace = namespaceAndPerm.get("namespace");
+                String topicPerm = namespaceAndPerm.get("topicPerm");
+                String groupPerm = namespaceAndPerm.get("groupPerm");
+                Map<String, Byte> permMap = new HashMap<>(2);
+                if (topicPerm != null && !topicPerm.isEmpty()) {
+                    permMap.put(AclConstants.CONFIG_TOPIC_PERM, 
Permission.parsePermFromString(topicPerm));
+                }
+                if (groupPerm != null && !groupPerm.isEmpty()) {
+                    permMap.put(AclConstants.CONFIG_GROUP_PERM, 
Permission.parsePermFromString(groupPerm));
+                }
+                namespacePermMap.put(namespace, permMap);
+            }
+            updatePlainAccessResource.setNamespacePermMap(namespacePermMap);
+        }
+        return updatePlainAccessResource;
+    }
+
+    public boolean updateAclResourcePerms(String accesskey, ResourceAndPerm 
resourceAndPerm, OperationType operationType) {
+        if (accesskey == null || resourceAndPerm == null || operationType == 
null) {
+            log.error("Parameter: accesskey or resourceAndPerm or 
operationType is null, Please check your parameter");
+            throw new AclException("The parameter of updateAclResourcePerms 
command: accesskey or resourceAndPerm or operationType is null, Please check 
your parameter");
+        }
+        if (operationType == OperationType.ADD || operationType == 
OperationType.UPDATE) {
+            checkResourceAndPerm(resourceAndPerm);
+        }
+
+        if (accessKeyTable.containsKey(accesskey)) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = accessKeyTable.get(accesskey);
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = updateAclAccessConfigMap(account, 
resourceAndPerm, operationType);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = updateAclAccessConfigMap(null, 
resourceAndPerm, operationType);
+                updateAccountMap.put(AclConstants.CONFIG_ACCESS_KEY, 
accesskey);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(accesskey, updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(accesskey, updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if (entry.getValue().getAccessKey().equals(accesskey)) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            throw new AclException("The accesskey for update acl resource 
perms is not exist in the acl config.");
+        }
+
+    }
+
+    public Map<String, Object> updateAclAccessConfigMap(Map<String, Object> 
existedAccountMap,
+        ResourceAndPerm resourceAndPerm, OperationType operationType) {
+        Map<String, Object> newAccountsMap = null;
+        if (existedAccountMap == null) {
+            newAccountsMap = new LinkedHashMap<String, Object>();
+        } else {
+            newAccountsMap = existedAccountMap;
+        }
+
+        List<LinkedHashMap> resourceAndPerms = 
(List<LinkedHashMap>)newAccountsMap.get(AclConstants.CONFIG_RESOURCE_PERMS);
+        if (resourceAndPerms == null) {
+            resourceAndPerms = new ArrayList<>();
+        }
+
+        if (operationType == OperationType.ADD) {
+            LinkedHashMap<String, Object> addResourceAndPerm = new 
LinkedHashMap<String, Object>(4);
+            addResourceAndPerm.put("resource", resourceAndPerm.getResource());
+            addResourceAndPerm.put("type", resourceAndPerm.getType());
+            addResourceAndPerm.put("namespace", 
resourceAndPerm.getNamespace());
+            addResourceAndPerm.put("perm", resourceAndPerm.getPerm());
+            resourceAndPerms.add(addResourceAndPerm);
+        } else {
+            Iterator iterator = resourceAndPerms.iterator();
+            while (iterator.hasNext()) {
+                LinkedHashMap<String, Object> curResourceAndPerm = 
(LinkedHashMap)iterator.next();
+                if 
(curResourceAndPerm.get("resource").toString().equals(resourceAndPerm.getResource())
 && 
curResourceAndPerm.get("type").toString().equals(resourceAndPerm.getType().toString()))
 {
+                    if (curResourceAndPerm.get("namespace") != null && 
resourceAndPerm.getNamespace() != null
+                        && !resourceAndPerm.getNamespace().isEmpty() && 
curResourceAndPerm.get("namespace").toString().equals(resourceAndPerm.getNamespace())
 ||
+                        curResourceAndPerm.get("namespace") == null && 
resourceAndPerm.getNamespace() == null) {
+                        iterator.remove();
+                        break;
+                    }
+                }
+            }
+            if (operationType == OperationType.UPDATE) {
+                LinkedHashMap addResourceAndPerm = new LinkedHashMap<String, 
Object>(4);
+                addResourceAndPerm.put("resource", 
resourceAndPerm.getResource());
+                addResourceAndPerm.put("type", resourceAndPerm.getType());
+                addResourceAndPerm.put("namespace", 
resourceAndPerm.getNamespace());
+                addResourceAndPerm.put("perm", resourceAndPerm.getPerm());
+                resourceAndPerms.add(addResourceAndPerm);
+            }
+        }
+
+        newAccountsMap.put(AclConstants.CONFIG_RESOURCE_PERMS, 
resourceAndPerms);
+
+        return newAccountsMap;
+    }
+
+    public boolean updateAclNamespacePerms(String accesskey, 
List<NamespaceAndPerm> namespaceAndPerms, OperationType operationType) {
+        if (accesskey == null || operationType == null || namespaceAndPerms == 
null) {
+            log.error("Parameter: accesskey or operationType is null, Please 
check your parameter");
+            throw new AclException("The parameter of updateAclResourcePerms 
command: accesskey or operationType is null, Please check your parameter");
+        }
+
+        if (accessKeyTable.containsKey(accesskey)) {
+            Map<String, Object> updateAccountMap = null;
+            String aclFileName = accessKeyTable.get(accesskey);
+            Map<String, Object> aclAccessConfigMap = 
AclUtils.getYamlDataObject(aclFileName, Map.class);
+            List<Map<String, Object>> accounts = (List<Map<String, Object>>) 
aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+            if (null != accounts) {
+                for (Map<String, Object> account : accounts) {
+                    if 
(account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) {
+                        // Update acl access config elements
+                        accounts.remove(account);
+                        updateAccountMap = updateAclAccessConfigMap(account, 
namespaceAndPerms, operationType);
+                        accounts.add(updateAccountMap);
+                        aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, 
accounts);
+                        break;
+                    }
+                }
+            } else {
+                // Maybe deleted in file, add it back
+                accounts = new LinkedList<>();
+                updateAccountMap = updateAclAccessConfigMap(null, 
namespaceAndPerms, operationType);
+                updateAccountMap.put(AclConstants.CONFIG_ACCESS_KEY, 
accesskey);
+                accounts.add(updateAccountMap);
+                aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            }
+            Map<String, PlainAccessResource> accountMap = 
aclPlainAccessResourceMap.get(aclFileName);
+            PlainAccessResource updatePlainAccessResource = 
updatePlainAccessResource(updateAccountMap);
+            if (accountMap == null) {
+                accountMap = new HashMap<String, PlainAccessResource>(1);
+                accountMap.put(accesskey, updatePlainAccessResource);
+            } else if (accountMap.size() == 0) {
+                accountMap.put(accesskey, updatePlainAccessResource);
+            } else {
+                for (Map.Entry<String, PlainAccessResource> entry : 
accountMap.entrySet()) {
+                    if (entry.getValue().getAccessKey().equals(accesskey)) {
+                        accountMap.put(entry.getKey(), 
updatePlainAccessResource);
+                        break;
+                    }
+                }
+            }
+            aclPlainAccessResourceMap.put(aclFileName, accountMap);
+            return AclUtils.writeDataObject(aclFileName, 
updateAclConfigFileVersion(aclFileName, aclAccessConfigMap));
+        } else {
+            throw new AclException("The accesskey for update acl namespace 
perms is not exist in the acl config.");
+        }
+
+
+    }
+
+    public Map<String, Object> updateAclAccessConfigMap(Map<String, Object> 
existedAccountMap,
+        List<NamespaceAndPerm> namespaceAndPerms, OperationType operationType) 
{
+        Map<String, Object> newAccountsMap = null;
+        if (existedAccountMap == null) {
+            newAccountsMap = new LinkedHashMap<String, Object>();
+        } else {
+            newAccountsMap = existedAccountMap;
+        }
+
+        List<LinkedHashMap<String, String>> namespaceAndPermList = 
(List<LinkedHashMap<String, 
String>>)newAccountsMap.get(AclConstants.CONFIG_NAMESPACE_PERMS);
+        if (namespaceAndPermList == null) {
+            namespaceAndPermList = new ArrayList<>();
+        }
+
+        if (operationType == OperationType.ADD) {
+            for (NamespaceAndPerm namespaceAndPerm : namespaceAndPerms) {
+                LinkedHashMap<String, String> linkedHashMap = new 
LinkedHashMap<>(3);
+                linkedHashMap.put("namespace", 
namespaceAndPerm.getNamespace());
+                linkedHashMap.put("topicPerm", 
namespaceAndPerm.getTopicPerm());
+                linkedHashMap.put("groupPerm", 
namespaceAndPerm.getGroupPerm());
+                namespaceAndPermList.add(linkedHashMap);
+            }
+            //namespaceAndPermList.addAll(namespaceAndPerms);
+            /*
+            for (NamespaceAndPerm namespaceAndPerm : namespaceAndPerms) {
+                namespaceAndPermList.add(namespaceAndPerm);
+            }
+             */
+        } else {
+            Iterator iterator = namespaceAndPermList.iterator();
+            while (iterator.hasNext()) {
+                LinkedHashMap<String, String> curNamespaceAndPerm = 
(LinkedHashMap<String, String>)iterator.next();
+                for (NamespaceAndPerm namespaceAndPerm : namespaceAndPerms) {
+                    if 
(curNamespaceAndPerm.get("namespace").equals(namespaceAndPerm.getNamespace())) {
+                        iterator.remove();
+                    }
+                }
+            }
+            if (operationType == OperationType.UPDATE) {
+                for (NamespaceAndPerm namespaceAndPerm : namespaceAndPerms) {
+                    LinkedHashMap<String, String> linkedHashMap = new 
LinkedHashMap<>(3);
+                    linkedHashMap.put("namespace", 
namespaceAndPerm.getNamespace());
+                    linkedHashMap.put("topicPerm", 
namespaceAndPerm.getTopicPerm());
+                    linkedHashMap.put("groupPerm", 
namespaceAndPerm.getGroupPerm());
+                    namespaceAndPermList.add(linkedHashMap);
+                }
+            }
+        }
+
+        newAccountsMap.put(AclConstants.CONFIG_NAMESPACE_PERMS, 
namespaceAndPermList);
+
+        return newAccountsMap;
+    }
+
+    public PlainAccessConfig getAccesskeyConfg(String accesskey) {

Review Comment:
   `getConfigByAccessKey` would be more accurate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to