xunliu commented on code in PR #6100:
URL: https://github.com/apache/gravitino/pull/6100#discussion_r1904918703


##########
core/src/main/java/org/apache/gravitino/authorization/AuthorizationUtils.java:
##########
@@ -364,4 +376,123 @@ private static void checkCatalogType(
           catalogIdent, catalog.type(), privilege);
     }
   }
+
+  public static List<String> getMetadataObjectLocation(
+      NameIdentifier ident, Entity.EntityType type) {
+    List<String> locations = new ArrayList<>();
+    MetadataObject metadataObject;
+    try {
+      metadataObject = NameIdentifierUtil.toMetadataObject(ident, type);
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Illegal argument exception for metadata object %s type %s", 
ident, type, e);
+      return locations;
+    }
+
+    String metalake =
+        (type == Entity.EntityType.METALAKE ? ident.name() : 
ident.namespace().level(0));
+    try {
+      switch (metadataObject.type()) {
+        case METALAKE:

Review Comment:
   I will consider adding other locations in the next PR.
   Currently, We only need catalog location.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java:
##########
@@ -76,6 +85,297 @@ public Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegesMappingRule()
         ImmutableSet.of(RangerHadoopSQLPrivilege.READ, 
RangerHadoopSQLPrivilege.SELECT));
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  @Override
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<String> nsMetadataObj = authzMetadataObject.names();
+    Map<String, String> preciseFilters = new HashMap<>();
+    for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+      preciseFilters.put(policyResourceDefinesRule().get(i), 
nsMetadataObj.get(i));
+    }
+    return preciseFindPolicy(authzMetadataObject, preciseFilters);
+  }
+
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  @Override
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    for (int i = 0; i < authzMetadataObject.names().size() && i < 
resourceDefines.size(); i++) {
+      searchFilters.put(
+          SearchFilter.RESOURCE_PREFIX + resourceDefines.get(i),
+          authzMetadataObject.names().get(i));
+    }
+
+    try {
+      return rangerClient.findPolicies(searchFilters);
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * If rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * If rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * If rename the COLUMN, Only need to rename `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> mappingOldAndNewMetadata = new ArrayList<>();
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      // Rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+      // * `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      // Rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      // Rename the COLUMN, Only need to rename `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {
+      // do nothing when fileset is renamed
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+
+    List<String> oldMetadataNames = new ArrayList<>();
+    List<String> newMetadataNames = new ArrayList<>();
+    for (int index = 0; index < mappingOldAndNewMetadata.size(); index++) {
+      
oldMetadataNames.add(mappingOldAndNewMetadata.get(index).keySet().stream().findFirst().get());
+      
newMetadataNames.add(mappingOldAndNewMetadata.get(index).values().stream().findFirst().get());
+
+      AuthorizationMetadataObject.Type type;
+      if (index == 0) {
+        type = RangerHadoopSQLMetadataObject.Type.SCHEMA;
+      } else if (index == 1) {
+        type = RangerHadoopSQLMetadataObject.Type.TABLE;
+      } else {
+        type = RangerHadoopSQLMetadataObject.Type.COLUMN;
+      }
+      AuthorizationMetadataObject oldHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(oldMetadataNames),
+              AuthorizationMetadataObject.getLastName(oldMetadataNames),
+              type);
+      AuthorizationMetadataObject newHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(newMetadataNames),
+              AuthorizationMetadataObject.getLastName(newMetadataNames),
+              type);
+      updatePolicyByMetadataObject(
+          type.metadataObjectType(), oldHadoopSQLMetadataObject, 
newHadoopSQLMetadataObject);
+    }
+  }
+
+  @Override
+  protected void updatePolicyByMetadataObject(
+      MetadataObject.Type operationType,
+      AuthorizationMetadataObject oldAuthzMetaObject,
+      AuthorizationMetadataObject newAuthzMetaObject) {
+    List<RangerPolicy> oldPolicies = wildcardSearchPolies(oldAuthzMetaObject);
+    List<RangerPolicy> existNewPolicies = 
wildcardSearchPolies(newAuthzMetaObject);
+    if (oldPolicies.isEmpty()) {
+      LOG.warn("Cannot find the Ranger policy for the metadata object({})!", 
oldAuthzMetaObject);
+      return;
+    }
+    if (!existNewPolicies.isEmpty()) {
+      LOG.warn("The Ranger policy for the metadata object({}) already 
exists!", newAuthzMetaObject);
+    }
+    Map<MetadataObject.Type, Integer> operationTypeIndex =
+        ImmutableMap.of(
+            MetadataObject.Type.SCHEMA, 0,
+            MetadataObject.Type.TABLE, 1,
+            MetadataObject.Type.COLUMN, 2);
+    oldPolicies.stream()
+        .forEach(
+            policy -> {
+              try {
+                String policyName = policy.getName();
+                int index = operationTypeIndex.get(operationType);
+
+                // Update the policy name is following Gravitino's spec
+                if (policy
+                    .getName()
+                    .equals(
+                        
AuthorizationSecurableObject.DOT_JOINER.join(oldAuthzMetaObject.names()))) {
+                  List<String> policyNames =
+                      Lists.newArrayList(
+                          
AuthorizationSecurableObject.DOT_SPLITTER.splitToList(policyName));
+                  Preconditions.checkArgument(
+                      policyNames.size() >= oldAuthzMetaObject.names().size(),
+                      String.format("The policy name(%s) is invalid!", 
policyName));
+                  if 
(policyNames.get(index).equals(RangerHelper.RESOURCE_ALL)) {
+                    // Doesn't need to rename the policy `*`
+                    return;
+                  }
+                  policyNames.set(index, 
newAuthzMetaObject.names().get(index));
+                  
policy.setName(AuthorizationSecurableObject.DOT_JOINER.join(policyNames));
+                }
+                // Update the policy resource name to new name
+                policy
+                    .getResources()
+                    .put(
+                        policyResourceDefinesRule().get(index),
+                        new RangerPolicy.RangerPolicyResource(
+                            newAuthzMetaObject.names().get(index)));
+
+                boolean alreadyExist =
+                    existNewPolicies.stream()
+                        .anyMatch(
+                            existNewPolicy ->
+                                
existNewPolicy.getName().equals(policy.getName())
+                                    || 
existNewPolicy.getResources().equals(policy.getResources()));
+                if (alreadyExist) {
+                  LOG.warn(
+                      "The Ranger policy for the metadata object({}) already 
exists!",
+                      newAuthzMetaObject);
+                  return;
+                }
+
+                // Update the policy
+                rangerClient.updatePolicy(policy.getId(), policy);
+              } catch (RangerServiceException e) {
+                LOG.error("Failed to rename the policy {}!", policy);
+                throw new RuntimeException(e);
+              }
+            });
+  }
+
+  /**
+   * If remove the SCHEMA, need to remove these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * If remove the TABLE, need to remove these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * If remove the COLUMN, Only need to remove `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRemoveMetadataObject(AuthorizationMetadataObject 
authzMetadataObject) {
+    AuthorizationMetadataObject.Type type = authzMetadataObject.type();
+    if (type.equals(SCHEMA)) {
+      doRemoveSchemaMetadataObject(authzMetadataObject);
+    } else if (type.equals(TABLE)) {
+      doRemoveTableMetadataObject(authzMetadataObject);
+    } else if (type.equals(COLUMN)) {
+      removePolicyByMetadataObject(authzMetadataObject);
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+  }
+
+  /**
+   * Remove the SCHEMA, Need to remove these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` permissions.
+   */
+  private void doRemoveSchemaMetadataObject(AuthorizationMetadataObject 
authMetadataObject) {
+    Preconditions.checkArgument(
+        authMetadataObject.type() == SCHEMA, "The metadata object type must be 
SCHEMA");
+    Preconditions.checkArgument(
+        authMetadataObject.names().size() == 1, "The metadata object names 
must be 1");
+    if (RangerHelper.RESOURCE_ALL.equals(authMetadataObject.name())) {
+      // Delete metalake or catalog policies in this Ranger service
+      try {
+        List<RangerPolicy> policies = 
rangerClient.getPoliciesInService(rangerServiceName);
+        policies.stream()
+            .filter(RangerHelper::hasGravitinoManagedPolicyItem)
+            .forEach(rangerHelper::removeAllGravitinoManagedPolicyItem);
+      } catch (RangerServiceException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      List<List<String>> loop =
+          ImmutableList.of(
+              ImmutableList.of(authMetadataObject.name())
+              /** SCHEMA permission */
+              ,
+              ImmutableList.of(authMetadataObject.name(), 
RangerHelper.RESOURCE_ALL)
+              /** TABLE permission */
+              ,
+              ImmutableList.of(
+                  authMetadataObject.name(), RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL)
+              /** COLUMN permission */
+              );
+
+      for (int index = 0; index < loop.size(); index++) {
+        AuthorizationMetadataObject.Type type =
+            (index == 0
+                ? RangerHadoopSQLMetadataObject.Type.SCHEMA
+                : (index == 1
+                    ? RangerHadoopSQLMetadataObject.Type.TABLE
+                    : RangerHadoopSQLMetadataObject.Type.COLUMN));
+        AuthorizationMetadataObject authzMetadataObject1 =
+            new RangerHadoopSQLMetadataObject(
+                AuthorizationMetadataObject.getParentFullName(loop.get(index)),
+                AuthorizationMetadataObject.getLastName(loop.get(index)),
+                type);
+        removePolicyByMetadataObject(authzMetadataObject1);
+      }
+    }
+  }
+
+  /**
+   * Remove the TABLE, Need to remove these the relevant policies, 
`*.{table}`, `*.{table}.{column}`
+   * permissions.
+   */
+  private void doRemoveTableMetadataObject(AuthorizationMetadataObject 
authzMetadataObject) {
+    List<List<String>> loop =
+        ImmutableList.of(
+            authzMetadataObject.names()
+            /** TABLE permission */
+            ,
+            Stream.concat(
+                    authzMetadataObject.names().stream(), 
Stream.of(RangerHelper.RESOURCE_ALL))
+                .collect(Collectors.toList())
+            /** COLUMN permission */
+            );
+
+    for (int index = 0; index < loop.size(); index++) {
+      AuthorizationMetadataObject.Type type =

Review Comment:
   OK, I refactor this code.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java:
##########
@@ -76,6 +85,297 @@ public Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegesMappingRule()
         ImmutableSet.of(RangerHadoopSQLPrivilege.READ, 
RangerHadoopSQLPrivilege.SELECT));
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  @Override
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<String> nsMetadataObj = authzMetadataObject.names();
+    Map<String, String> preciseFilters = new HashMap<>();
+    for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+      preciseFilters.put(policyResourceDefinesRule().get(i), 
nsMetadataObj.get(i));
+    }
+    return preciseFindPolicy(authzMetadataObject, preciseFilters);
+  }
+
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  @Override
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    for (int i = 0; i < authzMetadataObject.names().size() && i < 
resourceDefines.size(); i++) {
+      searchFilters.put(
+          SearchFilter.RESOURCE_PREFIX + resourceDefines.get(i),
+          authzMetadataObject.names().get(i));
+    }
+
+    try {
+      return rangerClient.findPolicies(searchFilters);
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * If rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * If rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * If rename the COLUMN, Only need to rename `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> mappingOldAndNewMetadata = new ArrayList<>();
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      // Rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+      // * `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      // Rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      // Rename the COLUMN, Only need to rename `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {

Review Comment:
   I fixed it.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java:
##########
@@ -76,6 +85,297 @@ public Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegesMappingRule()
         ImmutableSet.of(RangerHadoopSQLPrivilege.READ, 
RangerHadoopSQLPrivilege.SELECT));
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  @Override
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<String> nsMetadataObj = authzMetadataObject.names();
+    Map<String, String> preciseFilters = new HashMap<>();
+    for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+      preciseFilters.put(policyResourceDefinesRule().get(i), 
nsMetadataObj.get(i));
+    }
+    return preciseFindPolicy(authzMetadataObject, preciseFilters);
+  }
+
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  @Override
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    for (int i = 0; i < authzMetadataObject.names().size() && i < 
resourceDefines.size(); i++) {
+      searchFilters.put(
+          SearchFilter.RESOURCE_PREFIX + resourceDefines.get(i),
+          authzMetadataObject.names().get(i));
+    }
+
+    try {
+      return rangerClient.findPolicies(searchFilters);
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * If rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * If rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * If rename the COLUMN, Only need to rename `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> mappingOldAndNewMetadata = new ArrayList<>();
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      // Rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+      // * `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      // Rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      // Rename the COLUMN, Only need to rename `{schema}.*.*`
+      mappingOldAndNewMetadata =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) {
+      // do nothing when fileset is renamed
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+
+    List<String> oldMetadataNames = new ArrayList<>();
+    List<String> newMetadataNames = new ArrayList<>();
+    for (int index = 0; index < mappingOldAndNewMetadata.size(); index++) {
+      
oldMetadataNames.add(mappingOldAndNewMetadata.get(index).keySet().stream().findFirst().get());
+      
newMetadataNames.add(mappingOldAndNewMetadata.get(index).values().stream().findFirst().get());
+
+      AuthorizationMetadataObject.Type type;
+      if (index == 0) {
+        type = RangerHadoopSQLMetadataObject.Type.SCHEMA;
+      } else if (index == 1) {
+        type = RangerHadoopSQLMetadataObject.Type.TABLE;
+      } else {
+        type = RangerHadoopSQLMetadataObject.Type.COLUMN;
+      }
+      AuthorizationMetadataObject oldHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(oldMetadataNames),
+              AuthorizationMetadataObject.getLastName(oldMetadataNames),
+              type);
+      AuthorizationMetadataObject newHadoopSQLMetadataObject =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(newMetadataNames),
+              AuthorizationMetadataObject.getLastName(newMetadataNames),
+              type);
+      updatePolicyByMetadataObject(
+          type.metadataObjectType(), oldHadoopSQLMetadataObject, 
newHadoopSQLMetadataObject);
+    }
+  }
+
+  @Override
+  protected void updatePolicyByMetadataObject(
+      MetadataObject.Type operationType,
+      AuthorizationMetadataObject oldAuthzMetaObject,
+      AuthorizationMetadataObject newAuthzMetaObject) {
+    List<RangerPolicy> oldPolicies = wildcardSearchPolies(oldAuthzMetaObject);
+    List<RangerPolicy> existNewPolicies = 
wildcardSearchPolies(newAuthzMetaObject);
+    if (oldPolicies.isEmpty()) {
+      LOG.warn("Cannot find the Ranger policy for the metadata object({})!", 
oldAuthzMetaObject);
+      return;
+    }
+    if (!existNewPolicies.isEmpty()) {
+      LOG.warn("The Ranger policy for the metadata object({}) already 
exists!", newAuthzMetaObject);
+    }
+    Map<MetadataObject.Type, Integer> operationTypeIndex =
+        ImmutableMap.of(
+            MetadataObject.Type.SCHEMA, 0,
+            MetadataObject.Type.TABLE, 1,
+            MetadataObject.Type.COLUMN, 2);
+    oldPolicies.stream()
+        .forEach(
+            policy -> {
+              try {
+                String policyName = policy.getName();
+                int index = operationTypeIndex.get(operationType);
+
+                // Update the policy name is following Gravitino's spec
+                if (policy
+                    .getName()
+                    .equals(
+                        
AuthorizationSecurableObject.DOT_JOINER.join(oldAuthzMetaObject.names()))) {
+                  List<String> policyNames =
+                      Lists.newArrayList(
+                          
AuthorizationSecurableObject.DOT_SPLITTER.splitToList(policyName));
+                  Preconditions.checkArgument(
+                      policyNames.size() >= oldAuthzMetaObject.names().size(),
+                      String.format("The policy name(%s) is invalid!", 
policyName));
+                  if 
(policyNames.get(index).equals(RangerHelper.RESOURCE_ALL)) {
+                    // Doesn't need to rename the policy `*`
+                    return;
+                  }
+                  policyNames.set(index, 
newAuthzMetaObject.names().get(index));
+                  
policy.setName(AuthorizationSecurableObject.DOT_JOINER.join(policyNames));
+                }
+                // Update the policy resource name to new name
+                policy
+                    .getResources()
+                    .put(
+                        policyResourceDefinesRule().get(index),
+                        new RangerPolicy.RangerPolicyResource(
+                            newAuthzMetaObject.names().get(index)));
+
+                boolean alreadyExist =
+                    existNewPolicies.stream()
+                        .anyMatch(
+                            existNewPolicy ->
+                                
existNewPolicy.getName().equals(policy.getName())
+                                    || 
existNewPolicy.getResources().equals(policy.getResources()));
+                if (alreadyExist) {
+                  LOG.warn(
+                      "The Ranger policy for the metadata object({}) already 
exists!",
+                      newAuthzMetaObject);
+                  return;
+                }
+
+                // Update the policy
+                rangerClient.updatePolicy(policy.getId(), policy);
+              } catch (RangerServiceException e) {
+                LOG.error("Failed to rename the policy {}!", policy);
+                throw new RuntimeException(e);
+              }
+            });
+  }
+
+  /**
+   * If remove the SCHEMA, need to remove these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * If remove the TABLE, need to remove these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * If remove the COLUMN, Only need to remove `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRemoveMetadataObject(AuthorizationMetadataObject 
authzMetadataObject) {
+    AuthorizationMetadataObject.Type type = authzMetadataObject.type();
+    if (type.equals(SCHEMA)) {
+      doRemoveSchemaMetadataObject(authzMetadataObject);
+    } else if (type.equals(TABLE)) {
+      doRemoveTableMetadataObject(authzMetadataObject);
+    } else if (type.equals(COLUMN)) {
+      removePolicyByMetadataObject(authzMetadataObject);
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+  }
+
+  /**
+   * Remove the SCHEMA, Need to remove these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` permissions.
+   */
+  private void doRemoveSchemaMetadataObject(AuthorizationMetadataObject 
authMetadataObject) {
+    Preconditions.checkArgument(
+        authMetadataObject.type() == SCHEMA, "The metadata object type must be 
SCHEMA");
+    Preconditions.checkArgument(
+        authMetadataObject.names().size() == 1, "The metadata object names 
must be 1");
+    if (RangerHelper.RESOURCE_ALL.equals(authMetadataObject.name())) {
+      // Delete metalake or catalog policies in this Ranger service
+      try {
+        List<RangerPolicy> policies = 
rangerClient.getPoliciesInService(rangerServiceName);
+        policies.stream()
+            .filter(RangerHelper::hasGravitinoManagedPolicyItem)
+            .forEach(rangerHelper::removeAllGravitinoManagedPolicyItem);
+      } catch (RangerServiceException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      List<List<String>> loop =
+          ImmutableList.of(
+              ImmutableList.of(authMetadataObject.name())
+              /** SCHEMA permission */
+              ,
+              ImmutableList.of(authMetadataObject.name(), 
RangerHelper.RESOURCE_ALL)
+              /** TABLE permission */
+              ,
+              ImmutableList.of(
+                  authMetadataObject.name(), RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL)
+              /** COLUMN permission */
+              );
+
+      for (int index = 0; index < loop.size(); index++) {

Review Comment:
   OK, I refactor this code.



##########
authorizations/authorization-ranger/src/main/java/org/apache/gravitino/authorization/ranger/RangerAuthorizationHadoopSQLPlugin.java:
##########
@@ -76,6 +85,328 @@ public Map<Privilege.Name, Set<AuthorizationPrivilege>> 
privilegesMappingRule()
         ImmutableSet.of(RangerHadoopSQLPrivilege.READ, 
RangerHadoopSQLPrivilege.SELECT));
   }
 
+  /**
+   * Find the managed policy for the ranger securable object.
+   *
+   * @param authzMetadataObject The ranger securable object to find the 
managed policy.
+   * @return The managed policy for the metadata object.
+   */
+  public RangerPolicy findManagedPolicy(AuthorizationMetadataObject 
authzMetadataObject)
+      throws AuthorizationPluginException {
+    List<RangerPolicy> policies = wildcardSearchPolies(authzMetadataObject);
+    if (!policies.isEmpty()) {
+      /**
+       * Because Ranger doesn't support the precise search, Ranger will return 
the policy meets the
+       * wildcard(*,?) conditions, If you use `db.table` condition to search 
policy, the Ranger will
+       * match `db1.table1`, `db1.table2`, `db*.table*`, So we need to 
manually precisely filter
+       * this research results.
+       */
+      List<String> nsMetadataObj = authzMetadataObject.names();
+      Map<String, String> preciseFilters = new HashMap<>();
+      for (int i = 0; i < nsMetadataObj.size() && i < 
policyResourceDefinesRule().size(); i++) {
+        preciseFilters.put(policyResourceDefinesRule().get(i), 
nsMetadataObj.get(i));
+      }
+      policies =
+          policies.stream()
+              .filter(
+                  policy ->
+                      policy.getResources().entrySet().stream()
+                          .allMatch(
+                              entry ->
+                                  preciseFilters.containsKey(entry.getKey())
+                                      && entry.getValue().getValues().size() 
== 1
+                                      && entry
+                                          .getValue()
+                                          .getValues()
+                                          
.contains(preciseFilters.get(entry.getKey()))))
+              .collect(Collectors.toList());
+    }
+    // Only return the policies that are managed by Gravitino.
+    if (policies.size() > 1) {
+      throw new AuthorizationPluginException("Each metadata object can have at 
most one policy.");
+    }
+
+    if (policies.isEmpty()) {
+      return null;
+    }
+
+    RangerPolicy policy = policies.get(0);
+    // Delegating Gravitino management policies cannot contain duplicate 
privilege
+    policy.getPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    policy.getDenyPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getRowFilterPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+    
policy.getDataMaskPolicyItems().forEach(RangerHelper::checkPolicyItemAccess);
+
+    return policy;
+  }
+
+  /** Wildcard search the Ranger policies in the different Ranger service. */
+  @Override
+  protected List<RangerPolicy> wildcardSearchPolies(
+      AuthorizationMetadataObject authzMetadataObject) {
+    List<String> resourceDefines = policyResourceDefinesRule();
+    Map<String, String> searchFilters = new HashMap<>();
+    searchFilters.put(SearchFilter.SERVICE_NAME, rangerServiceName);
+    for (int i = 0; i < authzMetadataObject.names().size() && i < 
resourceDefines.size(); i++) {
+      searchFilters.put(
+          SearchFilter.RESOURCE_PREFIX + resourceDefines.get(i),
+          authzMetadataObject.names().get(i));
+    }
+
+    try {
+      List<RangerPolicy> policies = rangerClient.findPolicies(searchFilters);
+      return policies;
+    } catch (RangerServiceException e) {
+      throw new AuthorizationPluginException(e, "Failed to find the policies 
in the Ranger");
+    }
+  }
+
+  /**
+   * IF rename the SCHEMA, Need to rename these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF rename the TABLE, Need to rename these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * IF rename the COLUMN, Only need to rename `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRenameMetadataObject(
+      AuthorizationMetadataObject authzMetadataObject,
+      AuthorizationMetadataObject newAuthzMetadataObject) {
+    List<Map<String, String>> loop = new ArrayList<>();
+    if (newAuthzMetadataObject.type().equals(SCHEMA)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(TABLE)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(RangerHelper.RESOURCE_ALL, 
RangerHelper.RESOURCE_ALL));
+    } else if (newAuthzMetadataObject.type().equals(COLUMN)) {
+      loop =
+          ImmutableList.of(
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(0), 
newAuthzMetadataObject.names().get(0)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(1), 
newAuthzMetadataObject.names().get(1)),
+              ImmutableMap.of(
+                  authzMetadataObject.names().get(2), 
newAuthzMetadataObject.names().get(2)));
+    } else if (newAuthzMetadataObject.type().equals(PATH)) { // do nothing 
when fileset is renamed
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported metadata object type: " + authzMetadataObject.type());
+    }
+
+    List<String> oldMetadataNames = new ArrayList<>();
+    List<String> newMetadataNames = new ArrayList<>();
+    for (int index = 0; index < loop.size(); index++) {
+      
oldMetadataNames.add(loop.get(index).keySet().stream().findFirst().get());
+      
newMetadataNames.add(loop.get(index).values().stream().findFirst().get());
+
+      AuthorizationMetadataObject.Type type =
+          (index == 0
+              ? RangerHadoopSQLMetadataObject.Type.SCHEMA
+              : (index == 1
+                  ? RangerHadoopSQLMetadataObject.Type.TABLE
+                  : RangerHadoopSQLMetadataObject.Type.COLUMN));
+      AuthorizationMetadataObject authzMetadataObject1 =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(oldMetadataNames),
+              AuthorizationMetadataObject.getLastName(oldMetadataNames),
+              type);
+      AuthorizationMetadataObject newAuthzMetadataObject1 =
+          new RangerHadoopSQLMetadataObject(
+              AuthorizationMetadataObject.getParentFullName(newMetadataNames),
+              AuthorizationMetadataObject.getLastName(newMetadataNames),
+              type);
+      updatePolicyByMetadataObject(
+          type.metadataObjectType(), authzMetadataObject1, 
newAuthzMetadataObject1);
+    }
+  }
+
+  @Override
+  protected void updatePolicyByMetadataObject(
+      MetadataObject.Type operationType,
+      AuthorizationMetadataObject oldAuthzMetaobject,
+      AuthorizationMetadataObject newAuthzMetaobject) {
+    List<RangerPolicy> oldPolicies = wildcardSearchPolies(oldAuthzMetaobject);
+    List<RangerPolicy> existNewPolicies = 
wildcardSearchPolies(newAuthzMetaobject);
+    if (oldPolicies.isEmpty()) {
+      LOG.warn("Cannot find the Ranger policy for the metadata object({})!", 
oldAuthzMetaobject);
+    }
+    if (!existNewPolicies.isEmpty()) {
+      LOG.warn("The Ranger policy for the metadata object({}) already 
exists!", newAuthzMetaobject);
+    }
+    Map<MetadataObject.Type, Integer> operationTypeIndex =
+        ImmutableMap.of(
+            MetadataObject.Type.SCHEMA, 0,
+            MetadataObject.Type.TABLE, 1,
+            MetadataObject.Type.COLUMN, 2);
+    oldPolicies.stream()
+        .forEach(
+            policy -> {
+              try {
+                String policyName = policy.getName();
+                int index = operationTypeIndex.get(operationType);
+
+                // Update the policy name is following Gravitino's spec
+                if (policy
+                    .getName()
+                    .equals(
+                        
AuthorizationSecurableObject.DOT_JOINER.join(oldAuthzMetaobject.names()))) {
+                  List<String> policyNames =
+                      Lists.newArrayList(
+                          
AuthorizationSecurableObject.DOT_SPLITTER.splitToList(policyName));
+                  Preconditions.checkArgument(
+                      policyNames.size() >= oldAuthzMetaobject.names().size(),
+                      String.format("The policy name(%s) is invalid!", 
policyName));
+                  if 
(policyNames.get(index).equals(RangerHelper.RESOURCE_ALL)) {
+                    // Doesn't need to rename the policy `*`
+                    return;
+                  }
+                  policyNames.set(index, 
newAuthzMetaobject.names().get(index));
+                  
policy.setName(AuthorizationSecurableObject.DOT_JOINER.join(policyNames));
+                }
+                // Update the policy resource name to new name
+                policy
+                    .getResources()
+                    .put(
+                        policyResourceDefinesRule().get(index),
+                        new RangerPolicy.RangerPolicyResource(
+                            newAuthzMetaobject.names().get(index)));
+
+                boolean alreadyExist =
+                    existNewPolicies.stream()
+                        .anyMatch(
+                            existNewPolicy ->
+                                
existNewPolicy.getName().equals(policy.getName())
+                                    || 
existNewPolicy.getResources().equals(policy.getResources()));
+                if (alreadyExist) {
+                  LOG.warn(
+                      "The Ranger policy for the metadata object({}) already 
exists!",
+                      newAuthzMetaobject);
+                  return;
+                }
+
+                // Update the policy
+                rangerClient.updatePolicy(policy.getId(), policy);
+              } catch (RangerServiceException e) {
+                LOG.error("Failed to rename the policy {}!", policy);
+                throw new RuntimeException(e);
+              }
+            });
+  }
+
+  /**
+   * IF remove the SCHEMA, need to remove these the relevant policies, 
`{schema}`, `{schema}.*`,
+   * `{schema}.*.*` <br>
+   * IF remove the TABLE, need to remove these the relevant policies, 
`{schema}.*`, `{schema}.*.*`
+   * <br>
+   * IF remove the COLUMN, Only need to remove `{schema}.*.*` <br>
+   */
+  @Override
+  protected void doRemoveMetadataObject(AuthorizationMetadataObject 
authzMetadataObject) {
+    AuthorizationMetadataObject.Type type = authzMetadataObject.type();
+    if (type.equals(SCHEMA)) {
+      doRemoveSchemaMetadataObject(authzMetadataObject);
+    } else if (type.equals(TABLE)) {
+      doRemoveTableMetadataObject(authzMetadataObject);

Review Comment:
   I refactor this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to