This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cca6782d44 HDDS-9388. OM Ratis Write: Move ACL check and Bucket 
resolution to preExecute (#5694)
cca6782d44 is described below

commit cca6782d44a1417ac27897890eaac5cd097481de
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Jan 3 22:51:02 2024 -0800

    HDDS-9388. OM Ratis Write: Move ACL check and Bucket resolution to 
preExecute (#5694)
---
 .../hadoop/ozone/om/helpers/OmMultipartInfo.java   |  3 ++
 .../om/helpers/OmMultipartUploadCompleteInfo.java  |  3 ++
 .../client/rpc/TestOzoneAtRestEncryption.java      |  5 ---
 .../rpc/TestOzoneClientMultipartUploadWithFSO.java | 14 ------
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 22 ----------
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  3 --
 .../om/request/file/OMDirectoryCreateRequest.java  | 17 +++-----
 .../file/OMDirectoryCreateRequestWithFSO.java      | 10 -----
 .../ozone/om/request/file/OMFileCreateRequest.java | 17 +++-----
 .../request/file/OMFileCreateRequestWithFSO.java   | 10 -----
 .../om/request/file/OMRecoverLeaseRequest.java     | 11 +++--
 .../om/request/key/OMAllocateBlockRequest.java     | 19 +++-----
 .../request/key/OMAllocateBlockRequestWithFSO.java |  9 ----
 .../ozone/om/request/key/OMKeyCommitRequest.java   | 15 +++----
 .../om/request/key/OMKeyCommitRequestWithFSO.java  | 11 -----
 .../ozone/om/request/key/OMKeyCreateRequest.java   | 16 +++----
 .../om/request/key/OMKeyCreateRequestWithFSO.java  | 10 -----
 .../ozone/om/request/key/OMKeyDeleteRequest.java   | 23 +++++-----
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  | 16 ++++---
 .../ozone/om/request/key/OMKeyRenameRequest.java   | 36 +++++++++-------
 .../om/request/key/OMKeyRenameRequestWithFSO.java  | 50 +++++++++++++---------
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  | 40 +++++++++++++++++
 .../S3InitiateMultipartUploadRequest.java          | 17 +++-----
 .../S3InitiateMultipartUploadRequestWithFSO.java   |  4 --
 .../multipart/S3MultipartUploadAbortRequest.java   | 24 +++++------
 .../S3MultipartUploadCommitPartRequest.java        | 22 +++++-----
 .../S3MultipartUploadCompleteRequest.java          | 21 ++++-----
 .../request/file/TestOMDirectoryCreateRequest.java |  6 +++
 .../file/TestOMDirectoryCreateRequestWithFSO.java  |  6 +++
 .../om/request/key/TestOMKeyDeleteRequest.java     | 39 +++++++++--------
 .../request/key/TestOMKeyDeleteRequestWithFSO.java | 15 +++++++
 .../om/request/key/TestOMKeyRenameRequest.java     | 21 ++++-----
 .../request/key/TestOMKeyRenameRequestWithFSO.java |  9 ++--
 .../ozone/om/response/TestCleanupTableInfo.java    | 12 ------
 34 files changed, 247 insertions(+), 309 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java
index 98913d3ff7..c7ca169b82 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartInfo.java
@@ -47,6 +47,7 @@ public class OmMultipartInfo {
    * Return volume name.
    * @return volumeName
    */
+  @Deprecated
   public String getVolumeName() {
     return volumeName;
   }
@@ -55,6 +56,7 @@ public class OmMultipartInfo {
    * Return bucket name.
    * @return bucketName
    */
+  @Deprecated
   public String getBucketName() {
     return bucketName;
   }
@@ -63,6 +65,7 @@ public class OmMultipartInfo {
    * Return key name.
    * @return keyName
    */
+  @Deprecated
   public String getKeyName() {
     return keyName;
   }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
index 71ce882c6f..dc1f27a6c7 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
@@ -36,6 +36,7 @@ public class OmMultipartUploadCompleteInfo {
     this.hash = md5;
   }
 
+  @Deprecated
   public String getVolume() {
     return volume;
   }
@@ -44,6 +45,7 @@ public class OmMultipartUploadCompleteInfo {
     this.volume = volume;
   }
 
+  @Deprecated
   public String getBucket() {
     return bucket;
   }
@@ -52,6 +54,7 @@ public class OmMultipartUploadCompleteInfo {
     this.bucket = bucket;
   }
 
+  @Deprecated
   public String getKey() {
     return key;
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index c40b46a79d..a0fd275719 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -660,11 +660,6 @@ class TestOzoneAtRestEncryption {
         .completeMultipartUpload(keyName, uploadID, partsMap);
 
     assertNotNull(omMultipartUploadCompleteInfo);
-    assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket
-        .getName());
-    assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket
-        .getVolumeName());
-    assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName);
     assertNotNull(omMultipartUploadCompleteInfo.getHash());
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
index c7a5d5e376..5ad49a955c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientMultipartUploadWithFSO.java
@@ -188,9 +188,6 @@ public class TestOzoneClientMultipartUploadWithFSO {
 
     Assertions.assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    Assertions.assertEquals(volumeName, multipartInfo.getVolumeName());
-    Assertions.assertEquals(bucketName, multipartInfo.getBucketName());
-    Assertions.assertEquals(keyName, multipartInfo.getKeyName());
     Assertions.assertNotNull(multipartInfo.getUploadID());
 
     // Call initiate multipart upload for the same key again, this should
@@ -198,9 +195,6 @@ public class TestOzoneClientMultipartUploadWithFSO {
     multipartInfo = bucket.initiateMultipartUpload(keyName);
 
     Assertions.assertNotNull(multipartInfo);
-    Assertions.assertEquals(volumeName, multipartInfo.getVolumeName());
-    Assertions.assertEquals(bucketName, multipartInfo.getBucketName());
-    Assertions.assertEquals(keyName, multipartInfo.getKeyName());
     Assertions.assertNotEquals(multipartInfo.getUploadID(), uploadID);
     Assertions.assertNotNull(multipartInfo.getUploadID());
   }
@@ -920,9 +914,6 @@ public class TestOzoneClientMultipartUploadWithFSO {
 
     Assertions.assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    Assertions.assertEquals(volumeName, multipartInfo.getVolumeName());
-    Assertions.assertEquals(bucketName, multipartInfo.getBucketName());
-    Assertions.assertEquals(kName, multipartInfo.getKeyName());
     Assertions.assertNotNull(multipartInfo.getUploadID());
 
     return uploadID;
@@ -952,11 +943,6 @@ public class TestOzoneClientMultipartUploadWithFSO {
         .completeMultipartUpload(kName, uploadID, partsMap);
 
     Assertions.assertNotNull(omMultipartUploadCompleteInfo);
-    Assertions.assertEquals(omMultipartUploadCompleteInfo.getBucket(), oBucket
-        .getName());
-    Assertions.assertEquals(omMultipartUploadCompleteInfo.getVolume(), oBucket
-        .getVolumeName());
-    Assertions.assertEquals(omMultipartUploadCompleteInfo.getKey(), kName);
     Assertions.assertNotNull(omMultipartUploadCompleteInfo.getHash());
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 8f1315c27c..9d622d7333 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -1491,9 +1491,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotNull(multipartInfo.getUploadID());
 
     OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
@@ -2599,9 +2596,6 @@ public abstract class TestOzoneRpcClientAbstract {
         replicationConfig);
 
     assertNotNull(multipartInfo);
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotEquals(multipartInfo.getUploadID(), uploadID);
     assertNotNull(multipartInfo.getUploadID());
   }
@@ -2622,9 +2616,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotNull(multipartInfo.getUploadID());
 
     // Call initiate multipart upload for the same key again, this should
@@ -2632,9 +2623,6 @@ public abstract class TestOzoneRpcClientAbstract {
     multipartInfo = bucket.initiateMultipartUpload(keyName);
 
     assertNotNull(multipartInfo);
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotEquals(multipartInfo.getUploadID(), uploadID);
     assertNotNull(multipartInfo.getUploadID());
   }
@@ -2657,9 +2645,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotNull(multipartInfo.getUploadID());
 
     OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
@@ -2694,9 +2679,6 @@ public abstract class TestOzoneRpcClientAbstract {
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotNull(multipartInfo.getUploadID());
 
     OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
@@ -3796,10 +3778,6 @@ public abstract class TestOzoneRpcClientAbstract {
         .completeMultipartUpload(keyName, uploadID, partsMap);
 
     assertNotNull(omMultipartUploadCompleteInfo);
-    assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket
-        .getName());
-    assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket
-        .getVolumeName());
     assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName);
     assertNotNull(omMultipartUploadCompleteInfo.getHash());
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 19e111d65d..3030e6e3f2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -204,9 +204,6 @@ public class TestOzoneRpcClientWithRatis extends 
TestOzoneRpcClientAbstract {
 
     assertNotNull(multipartInfo);
     String uploadID = multipartInfo.getUploadID();
-    assertEquals(volumeName, multipartInfo.getVolumeName());
-    assertEquals(bucketName, multipartInfo.getBucketName());
-    assertEquals(keyName, multipartInfo.getKeyName());
     assertNotNull(multipartInfo.getUploadID());
 
     OzoneDataStreamOutput ozoneStreamOutput = bucket.createMultipartStreamKey(
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index 4554092a61..1c636aab96 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -45,8 +45,7 @@ import 
org.apache.hadoop.ozone.om.request.validation.ValidationCondition;
 import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,7 +114,7 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     CreateDirectoryRequest createDirectoryRequest =
-        getOmRequest().getCreateDirectoryRequest();
+        super.preExecute(ozoneManager).getCreateDirectoryRequest();
     Preconditions.checkNotNull(createDirectoryRequest);
 
     OmUtils.verifyKeyNameWithSnapshotReservedWord(
@@ -124,8 +123,10 @@ public class OMDirectoryCreateRequest extends OMKeyRequest 
{
     KeyArgs.Builder newKeyArgs = createDirectoryRequest.getKeyArgs()
         .toBuilder().setModificationTime(Time.now());
 
+    KeyArgs resolvedKeyArgs = resolveBucketAndCheckKeyAcls(newKeyArgs.build(),
+        ozoneManager, ACLType.CREATE);
     CreateDirectoryRequest.Builder newCreateDirectoryRequest =
-        createDirectoryRequest.toBuilder().setKeyArgs(newKeyArgs);
+        createDirectoryRequest.toBuilder().setKeyArgs(resolvedKeyArgs);
 
     return getOmRequest().toBuilder().setCreateDirectoryRequest(
         newCreateDirectoryRequest).setUserInfo(getUserInfo()).build();
@@ -163,14 +164,6 @@ public class OMDirectoryCreateRequest extends OMKeyRequest 
{
     int numMissingParents = 0;
 
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
         throw new OMException("Directory create failed. Cannot create " +
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
index e62666e27b..53fda0f8f0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
@@ -44,8 +44,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
         .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
         .Status;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,14 +108,6 @@ public class OMDirectoryCreateRequestWithFSO extends 
OMDirectoryCreateRequest {
     List<OmDirectoryInfo> missingParentInfos;
 
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       // Check if this is the root of the filesystem.
       if (keyName.length() == 0) {
         throw new OMException("Directory create failed. Cannot create " +
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 32dbe986b7..d4bc91dbfd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -65,7 +65,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.UniqueId;
 
@@ -89,7 +88,8 @@ public class OMFileCreateRequest extends OMKeyRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    CreateFileRequest createFileRequest = 
getOmRequest().getCreateFileRequest();
+    CreateFileRequest createFileRequest = super.preExecute(ozoneManager)
+        .getCreateFileRequest();
     Preconditions.checkNotNull(createFileRequest);
 
     KeyArgs keyArgs = createFileRequest.getKeyArgs();
@@ -154,9 +154,12 @@ public class OMFileCreateRequest extends OMKeyRequest {
         .map(info -> info.getProtobuf(getOmRequest().getVersion()))
         .collect(Collectors.toList()));
 
+    KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs.build(),
+        ozoneManager, IAccessAuthorizer.ACLType.CREATE);
+
     generateRequiredEncryptionInfo(keyArgs, newKeyArgs, ozoneManager);
     CreateFileRequest.Builder newCreateFileRequest =
-        createFileRequest.toBuilder().setKeyArgs(newKeyArgs)
+        createFileRequest.toBuilder().setKeyArgs(resolvedArgs)
             .setClientID(UniqueId.next());
 
     return getOmRequest().toBuilder()
@@ -207,14 +210,6 @@ public class OMFileCreateRequest extends OMKeyRequest {
     Exception exception = null;
     Result result = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       // acquire lock
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
index 474433a2f0..393be170a5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
@@ -40,8 +40,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,20 +108,12 @@ public class OMFileCreateRequestWithFSO extends 
OMFileCreateRequest {
     Exception exception = null;
     Result result = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
       if (keyName.length() == 0) {
         // Check if this is the root of the filesystem.
         throw new OMException("Can not write to directory: " + keyName,
                 OMException.ResultCodes.NOT_A_FILE);
       }
 
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       // acquire lock
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
index 3d23ee0832..510ea54f72 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
@@ -96,6 +96,13 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
         validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(),
             keyPath, getBucketLayout());
 
+    // check ACL
+    checkKeyAcls(ozoneManager,
+        recoverLeaseRequest.getVolumeName(),
+        recoverLeaseRequest.getBucketName(),
+        recoverLeaseRequest.getKeyName(),
+        IAccessAuthorizer.ACLType.WRITE, OzoneObj.ResourceType.KEY);
+
     return request.toBuilder()
         .setRecoverLeaseRequest(
             recoverLeaseRequest.toBuilder()
@@ -125,10 +132,6 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
 
     boolean acquiredLock = false;
     try {
-      // check ACL
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.WRITE, OzoneObj.ResourceType.KEY);
-
       // acquire lock
       mergeOmLockDetails(
           omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index fc9b7da93b..ac4d9ab624 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -37,8 +37,8 @@ import 
org.apache.hadoop.ozone.om.request.validation.ValidationCondition;
 import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,9 +82,8 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-
     AllocateBlockRequest allocateBlockRequest =
-        getOmRequest().getAllocateBlockRequest();
+        super.preExecute(ozoneManager).getAllocateBlockRequest();
 
     Preconditions.checkNotNull(allocateBlockRequest);
 
@@ -126,10 +125,14 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     KeyArgs.Builder newKeyArgs =
         
keyArgs.toBuilder().setModificationTime(Time.now()).setKeyName(keyPath);
 
+    KeyArgs resolvedKeyArgs =
+        resolveBucketAndCheckOpenKeyAcls(newKeyArgs.build(), ozoneManager,
+            ACLType.WRITE, allocateBlockRequest.getClientID());
+
     AllocateBlockRequest.Builder newAllocatedBlockRequest =
         AllocateBlockRequest.newBuilder()
             .setClientID(allocateBlockRequest.getClientID())
-            .setKeyArgs(newKeyArgs);
+            .setKeyArgs(resolvedKeyArgs);
 
     if (allocateBlockRequest.hasExcludeList()) {
       newAllocatedBlockRequest.setExcludeList(
@@ -185,14 +188,6 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     boolean acquiredLock = false;
 
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.WRITE, allocateBlockRequest.getClientID());
-
       validateBucketAndVolume(omMetadataManager, volumeName,
           bucketName);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
index 3f3b386d1a..ea634a9684 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
@@ -44,7 +44,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Allocat
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,14 +110,6 @@ public class OMAllocateBlockRequestWithFSO extends 
OMAllocateBlockRequest {
     boolean acquiredLock = false;
 
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.WRITE, allocateBlockRequest.getClientID());
-
       validateBucketAndVolume(omMetadataManager, volumeName,
           bucketName);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 863bc9af38..488e8baca0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -120,9 +120,13 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         keyArgs.toBuilder().setModificationTime(Time.now())
             .setKeyName(keyPath);
 
+    KeyArgs resolvedKeyArgs =
+        resolveBucketAndCheckOpenKeyAcls(newKeyArgs.build(), ozoneManager,
+            IAccessAuthorizer.ACLType.WRITE, commitKeyRequest.getClientID());
+
     return request.toBuilder()
         .setCommitKeyRequest(commitKeyRequest.toBuilder()
-            .setKeyArgs(newKeyArgs)).build();
+            .setKeyArgs(resolvedKeyArgs)).build();
   }
 
   @Override
@@ -169,15 +173,6 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         isHSync, volumeName, bucketName, keyName);
 
     try {
-      commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
-      volumeName = commitKeyArgs.getVolumeName();
-      bucketName = commitKeyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName,
-          keyName, IAccessAuthorizer.ACLType.WRITE,
-          commitKeyRequest.getClientID());
-
       String dbOzoneKey =
           omMetadataManager.getOzoneKey(volumeName, bucketName,
               keyName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index cf86e1e8a4..2d290e8007 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -46,7 +46,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitK
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,16 +113,6 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
     try {
-      commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
-      volumeName = commitKeyArgs.getVolumeName();
-      bucketName = commitKeyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName,
-              keyName, IAccessAuthorizer.ACLType.WRITE,
-              commitKeyRequest.getClientID());
-
-
       String dbOpenFileKey = null;
 
       List<OmKeyLocationInfo>
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index 4b5f5a8330..48805d6e4e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -65,7 +65,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRespo
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.UniqueId;
 
@@ -88,7 +87,8 @@ public class OMKeyCreateRequest extends OMKeyRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    CreateKeyRequest createKeyRequest = getOmRequest().getCreateKeyRequest();
+    CreateKeyRequest createKeyRequest = super.preExecute(ozoneManager)
+        .getCreateKeyRequest();
     Preconditions.checkNotNull(createKeyRequest);
 
     KeyArgs keyArgs = createKeyRequest.getKeyArgs();
@@ -174,8 +174,11 @@ public class OMKeyCreateRequest extends OMKeyRequest {
       generateRequiredEncryptionInfo(keyArgs, newKeyArgs, ozoneManager);
     }
 
+    KeyArgs resolvedKeyArgs =
+        resolveBucketAndCheckKeyAcls(newKeyArgs.build(), ozoneManager,
+            IAccessAuthorizer.ACLType.CREATE);
     newCreateKeyRequest =
-        createKeyRequest.toBuilder().setKeyArgs(newKeyArgs)
+        createKeyRequest.toBuilder().setKeyArgs(resolvedKeyArgs)
             .setClientID(UniqueId.next());
 
     return getOmRequest().toBuilder()
@@ -213,13 +216,6 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     List<OmKeyInfo> missingParentInfos = null;
     int numMissingParents = 0;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
 
       mergeOmLockDetails(
           ozoneLockStrategy.acquireWriteLock(omMetadataManager, volumeName,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 2a609af446..65a485305d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -41,8 +41,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
 
 import java.io.IOException;
 import java.nio.file.InvalidPathException;
@@ -99,14 +97,6 @@ public class OMKeyCreateRequestWithFSO extends 
OMKeyCreateRequest {
     List<OmDirectoryInfo> missingParentInfos;
     int numKeysCreated = 0;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-              IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
       acquireLock = getOmLockDetails().isLockAcquired();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
index fa51a125a5..82d3bdc9e8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
@@ -31,8 +31,7 @@ import 
org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
 import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase;
 import org.apache.hadoop.ozone.om.request.validation.ValidationCondition;
 import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +73,8 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    DeleteKeyRequest deleteKeyRequest = getOmRequest().getDeleteKeyRequest();
+    DeleteKeyRequest deleteKeyRequest = super.preExecute(ozoneManager)
+        .getDeleteKeyRequest();
     Preconditions.checkNotNull(deleteKeyRequest);
 
     OzoneManagerProtocolProtos.KeyArgs keyArgs = deleteKeyRequest.getKeyArgs();
@@ -87,12 +87,19 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
     OzoneManagerProtocolProtos.KeyArgs.Builder newKeyArgs =
         
keyArgs.toBuilder().setModificationTime(Time.now()).setKeyName(keyPath);
 
+    KeyArgs resolvedArgs = resolveBucketAndCheckAcls(ozoneManager, newKeyArgs);
     return getOmRequest().toBuilder()
         .setDeleteKeyRequest(deleteKeyRequest.toBuilder()
-            .setKeyArgs(newKeyArgs))
+            .setKeyArgs(resolvedArgs))
         .setUserInfo(getUserIfNotExists(ozoneManager)).build();
   }
 
+  protected KeyArgs resolveBucketAndCheckAcls(OzoneManager ozoneManager,
+      KeyArgs.Builder newKeyArgs) throws IOException {
+    return resolveBucketAndCheckKeyAcls(newKeyArgs.build(), ozoneManager,
+        ACLType.DELETE);
+  }
+
   @Override
   @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, 
TermIndex termIndex) {
@@ -121,14 +128,6 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
     Result result = null;
 
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
-
       String objectKey =
           omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
index 28568d96f6..a817c71165 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequestWithFSO.java
@@ -96,13 +96,6 @@ public class OMKeyDeleteRequestWithFSO extends 
OMKeyDeleteRequest {
     Result result = null;
     OmBucketInfo omBucketInfo = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      checkACLsWithFSO(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.DELETE);
-
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
       acquiredLock = getOmLockDetails().isLockAcquired();
@@ -210,4 +203,13 @@ public class OMKeyDeleteRequestWithFSO extends 
OMKeyDeleteRequest {
 
     return omClientResponse;
   }
+
+  @Override
+  protected OzoneManagerProtocolProtos.KeyArgs resolveBucketAndCheckAcls(
+      OzoneManager ozoneManager,
+      OzoneManagerProtocolProtos.KeyArgs.Builder newKeyArgs)
+      throws IOException {
+    return resolveBucketAndCheckKeyAclsWithFSO(newKeyArgs.build(),
+        ozoneManager, IAccessAuthorizer.ACLType.DELETE);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
index 2d21354350..db08951831 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
@@ -79,8 +79,8 @@ public class OMKeyRenameRequest extends OMKeyRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-
-    RenameKeyRequest renameKeyRequest = getOmRequest().getRenameKeyRequest();
+    RenameKeyRequest renameKeyRequest = super.preExecute(ozoneManager)
+        .getRenameKeyRequest();
     Preconditions.checkNotNull(renameKeyRequest);
 
     // Verify key name
@@ -100,13 +100,31 @@ public class OMKeyRenameRequest extends OMKeyRequest {
     KeyArgs.Builder newKeyArgs = renameKeyArgs.toBuilder()
         .setModificationTime(Time.now()).setKeyName(srcKey);
 
+    KeyArgs resolvedArgs = resolveBucketAndCheckAcls(newKeyArgs.build(),
+        ozoneManager, srcKey, dstKey);
+
     return getOmRequest().toBuilder()
         .setRenameKeyRequest(renameKeyRequest.toBuilder().setToKeyName(dstKey)
-            .setKeyArgs(newKeyArgs))
+            .setKeyArgs(resolvedArgs))
         .setUserInfo(getUserIfNotExists(ozoneManager)).build();
 
   }
 
+  protected KeyArgs resolveBucketAndCheckAcls(KeyArgs keyArgs,
+      OzoneManager ozoneManager, String fromKeyName, String toKeyName)
+      throws IOException {
+    KeyArgs resolvedArgs = resolveBucketLink(ozoneManager, keyArgs);
+    // check Acl
+    String volumeName = resolvedArgs.getVolumeName();
+    String bucketName = resolvedArgs.getBucketName();
+
+    checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
+        IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+    checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+        IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+    return resolvedArgs;
+  }
+
 
   @Override
   @SuppressWarnings("methodlength")
@@ -143,18 +161,6 @@ public class OMKeyRenameRequest extends OMKeyRequest {
         throw new OMException("Key name is empty",
             OMException.ResultCodes.INVALID_KEY_NAME);
       }
-
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acls to see if user has access to perform delete operation on
-      // old key and create operation on new key
-      checkKeyAcls(ozoneManager, volumeName, bucketName, fromKeyName,
-          IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
-      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
       acquiredLock = getOmLockDetails().isLockAcquired();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
index ea85459071..3d29f6576b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
@@ -108,27 +108,6 @@ public class OMKeyRenameRequestWithFSO extends 
OMKeyRenameRequest {
                 OMException.ResultCodes.INVALID_KEY_NAME);
       }
 
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acls to see if user has access to perform delete operation on
-      // old key and create operation on new key
-
-      // check Acl fromKeyName
-      checkACLsWithFSO(ozoneManager, volumeName, bucketName, fromKeyName,
-          IAccessAuthorizer.ACLType.DELETE);
-
-      // check Acl toKeyName
-      if (toKeyName.isEmpty()) {
-        // if the toKeyName is empty we are checking the ACLs of the bucket
-        checkBucketAcls(ozoneManager, volumeName, bucketName, toKeyName,
-            IAccessAuthorizer.ACLType.CREATE);
-      } else {
-        checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
-            IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-      }
-
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
       acquiredLock = getOmLockDetails().isLockAcquired();
@@ -261,6 +240,34 @@ public class OMKeyRenameRequestWithFSO extends 
OMKeyRenameRequest {
     return omClientResponse;
   }
 
+  @Override
+  protected KeyArgs resolveBucketAndCheckAcls(KeyArgs keyArgs,
+      OzoneManager ozoneManager, String fromKeyName, String toKeyName)
+      throws IOException {
+    KeyArgs resolvedArgs = resolveBucketLink(ozoneManager, keyArgs);
+    // check Acl
+    String volumeName = resolvedArgs.getVolumeName();
+    String bucketName = resolvedArgs.getBucketName();
+    // check Acls to see if user has access to perform delete operation on
+    // old key and create operation on new key
+
+    // check Acl fromKeyName
+    checkACLsWithFSO(ozoneManager, volumeName, bucketName, fromKeyName,
+        IAccessAuthorizer.ACLType.DELETE);
+
+    // check Acl toKeyName
+    if (toKeyName.isEmpty()) {
+      // if the toKeyName is empty we are checking the ACLs of the bucket
+      checkBucketAcls(ozoneManager, volumeName, bucketName, toKeyName,
+          IAccessAuthorizer.ACLType.CREATE);
+    } else {
+      checkKeyAcls(ozoneManager, volumeName, bucketName, toKeyName,
+          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
+    }
+
+    return resolvedArgs;
+  }
+
   @SuppressWarnings("parameternumber")
   private OMClientResponse renameKey(OmKeyInfo toKeyParent, String toKeyName,
       OmKeyInfo fromKeyValue, String fromKeyName, boolean isRenameDirectory,
@@ -341,6 +348,7 @@ public class OMKeyRenameRequestWithFSO extends 
OMKeyRenameRequest {
         omBucketInfo, isRenameDirectory, getBucketLayout());
     return omClientResponse;
   }
+
   @SuppressWarnings("checkstyle:ParameterNumber")
   private void setModificationTime(OMMetadataManager omMetadataManager,
       OmBucketInfo bucketInfo, OmKeyInfo keyParent,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 800f982034..3e87984ac0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -133,6 +133,46 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
     return keyArgs;
   }
 
+  protected KeyArgs resolveBucketLink(
+      OzoneManager ozoneManager, KeyArgs keyArgs) throws IOException {
+    ResolvedBucket bucket = ozoneManager.resolveBucketLink(keyArgs, this);
+    keyArgs = bucket.update(keyArgs);
+    return keyArgs;
+  }
+
+  protected KeyArgs resolveBucketAndCheckKeyAcls(KeyArgs keyArgs,
+      OzoneManager ozoneManager, IAccessAuthorizer.ACLType aclType)
+      throws IOException {
+    KeyArgs resolvedArgs = resolveBucketLink(ozoneManager, keyArgs);
+    // check Acl
+    checkKeyAcls(ozoneManager, resolvedArgs.getVolumeName(),
+        resolvedArgs.getBucketName(), keyArgs.getKeyName(),
+        aclType, OzoneObj.ResourceType.KEY);
+    return resolvedArgs;
+  }
+
+  protected KeyArgs resolveBucketAndCheckKeyAclsWithFSO(KeyArgs keyArgs,
+      OzoneManager ozoneManager, IAccessAuthorizer.ACLType aclType)
+      throws IOException {
+    KeyArgs resolvedArgs = resolveBucketLink(ozoneManager, keyArgs);
+    // check Acl
+    checkACLsWithFSO(ozoneManager, resolvedArgs.getVolumeName(),
+        resolvedArgs.getBucketName(), keyArgs.getKeyName(), aclType);
+    return resolvedArgs;
+  }
+
+  protected KeyArgs resolveBucketAndCheckOpenKeyAcls(KeyArgs keyArgs,
+      OzoneManager ozoneManager, IAccessAuthorizer.ACLType aclType,
+      long clientId)
+      throws IOException {
+    KeyArgs resolvedArgs = resolveBucketLink(ozoneManager, keyArgs);
+    // check Acl
+    checkKeyAclsInOpenKeyTable(ozoneManager, resolvedArgs.getVolumeName(),
+        resolvedArgs.getBucketName(), keyArgs.getKeyName(),
+        aclType, clientId);
+    return resolvedArgs;
+  }
+
   /**
    * This methods avoids multiple rpc calls to SCM by allocating multiple 
blocks
    * in one rpc call.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index ef721cb530..e1772d4009 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -49,8 +49,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -82,7 +81,7 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     MultipartInfoInitiateRequest multipartInfoInitiateRequest =
-        getOmRequest().getInitiateMultiPartUploadRequest();
+        super.preExecute(ozoneManager).getInitiateMultiPartUploadRequest();
     Preconditions.checkNotNull(multipartInfoInitiateRequest);
 
     KeyArgs keyArgs = multipartInfoInitiateRequest.getKeyArgs();
@@ -99,10 +98,12 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
 
     generateRequiredEncryptionInfo(keyArgs, newKeyArgs, ozoneManager);
 
+    KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs.build(),
+        ozoneManager, ACLType.CREATE);
     return getOmRequest().toBuilder()
         .setUserInfo(getUserInfo())
         .setInitiateMultiPartUploadRequest(
-            multipartInfoInitiateRequest.toBuilder().setKeyArgs(newKeyArgs))
+            multipartInfoInitiateRequest.toBuilder().setKeyArgs(resolvedArgs))
         .build();
   }
 
@@ -140,14 +141,6 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
         getOmRequest());
     OMClientResponse omClientResponse = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.CREATE, OzoneObj.ResourceType.KEY);
-
       mergeOmLockDetails(
           omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
               bucketName));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
index d555cf62c9..d1c865fbc7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
@@ -99,11 +99,7 @@ public class S3InitiateMultipartUploadRequestWithFSO
         getOmRequest());
     OMClientResponse omClientResponse = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
 
-      // TODO to support S3 ACL later.
       mergeOmLockDetails(
           omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
               bucketName));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
index c36e7d3111..c7a7245533 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
@@ -34,8 +34,7 @@ import 
org.apache.hadoop.ozone.om.request.validation.ValidationCondition;
 import org.apache.hadoop.ozone.om.request.validation.ValidationContext;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,16 +80,21 @@ public class S3MultipartUploadAbortRequest extends 
OMKeyRequest {
 
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
-    KeyArgs keyArgs =
-        getOmRequest().getAbortMultiPartUploadRequest().getKeyArgs();
+    KeyArgs keyArgs = super.preExecute(ozoneManager)
+        .getAbortMultiPartUploadRequest().getKeyArgs();
     String keyPath = keyArgs.getKeyName();
     keyPath = validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(),
         keyPath, getBucketLayout());
 
+    KeyArgs newKeyArgs =
+        keyArgs.toBuilder().setModificationTime(Time.now())
+            .setKeyName(keyPath).build();
+
+    KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs,
+        ozoneManager, ACLType.WRITE);
     return getOmRequest().toBuilder().setAbortMultiPartUploadRequest(
         getOmRequest().getAbortMultiPartUploadRequest().toBuilder().setKeyArgs(
-            keyArgs.toBuilder().setModificationTime(Time.now())
-                .setKeyName(keyPath))).setUserInfo(getUserInfo()).build();
+            resolvedArgs)).setUserInfo(getUserInfo()).build();
 
   }
 
@@ -122,14 +126,6 @@ public class S3MultipartUploadAbortRequest extends 
OMKeyRequest {
     Result result = null;
     OmBucketInfo omBucketInfo = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
-      // check acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.WRITE, OzoneObj.ResourceType.KEY);
-
       mergeOmLockDetails(
           omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
               bucketName));
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
index bdd5ef1e90..f461bbd171 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
@@ -53,7 +53,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
@@ -84,17 +84,23 @@ public class S3MultipartUploadCommitPartRequest extends 
OMKeyRequest {
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
-        getOmRequest().getCommitMultiPartUploadRequest();
+        super.preExecute(ozoneManager).getCommitMultiPartUploadRequest();
 
     KeyArgs keyArgs = multipartCommitUploadPartRequest.getKeyArgs();
     String keyPath = keyArgs.getKeyName();
     keyPath = validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(),
         keyPath, getBucketLayout());
 
+    KeyArgs newKeyArgs =
+        keyArgs.toBuilder().setModificationTime(Time.now())
+            .setKeyName(keyPath).build();
+
+    KeyArgs resolvedArgs = resolveBucketAndCheckOpenKeyAcls(newKeyArgs,
+        ozoneManager, ACLType.WRITE,
+        multipartCommitUploadPartRequest.getClientID());
     return getOmRequest().toBuilder().setCommitMultiPartUploadRequest(
         multipartCommitUploadPartRequest.toBuilder().setKeyArgs(
-            keyArgs.toBuilder().setModificationTime(Time.now())
-                .setKeyName(keyPath))).setUserInfo(getUserInfo()).build();
+            resolvedArgs)).setUserInfo(getUserInfo()).build();
   }
 
   @Override
@@ -130,16 +136,8 @@ public class S3MultipartUploadCommitPartRequest extends 
OMKeyRequest {
     OmBucketInfo omBucketInfo = null;
     OmBucketInfo copyBucketInfo = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
       long clientID = multipartCommitUploadPartRequest.getClientID();
 
-      // check acl
-      checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.WRITE, clientID);
-
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
       acquiredLock = getOmLockDetails().isLockAcquired();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index 9be3260093..1b52318e4d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -62,8 +62,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,17 +88,21 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
   @Override
   public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
     MultipartUploadCompleteRequest multipartUploadCompleteRequest =
-        getOmRequest().getCompleteMultiPartUploadRequest();
+        super.preExecute(ozoneManager).getCompleteMultiPartUploadRequest();
 
     KeyArgs keyArgs = multipartUploadCompleteRequest.getKeyArgs();
     String keyPath = keyArgs.getKeyName();
     keyPath = validateAndNormalizeKey(ozoneManager.getEnableFileSystemPaths(),
         keyPath, getBucketLayout());
 
+    KeyArgs newKeyArgs = keyArgs.toBuilder().setModificationTime(Time.now())
+            .setKeyName(keyPath).build();
+    KeyArgs resolvedArgs = resolveBucketAndCheckKeyAcls(newKeyArgs,
+        ozoneManager, ACLType.WRITE);
+
     return getOmRequest().toBuilder().setCompleteMultiPartUploadRequest(
         multipartUploadCompleteRequest.toBuilder().setKeyArgs(
-            keyArgs.toBuilder().setModificationTime(Time.now())
-                .setKeyName(keyPath))).setUserInfo(getUserInfo()).build();
+            resolvedArgs)).setUserInfo(getUserInfo()).build();
   }
 
   @Override
@@ -134,17 +137,9 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
     Exception exception = null;
     Result result = null;
     try {
-      keyArgs = resolveBucketLink(ozoneManager, keyArgs, auditMap);
-      volumeName = keyArgs.getVolumeName();
-      bucketName = keyArgs.getBucketName();
-
       multipartKey = omMetadataManager.getMultipartKey(volumeName,
           bucketName, keyName, uploadID);
 
-      // check Acl
-      checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-          IAccessAuthorizer.ACLType.WRITE, OzoneObj.ResourceType.KEY);
-
       mergeOmLockDetails(omMetadataManager.getLock()
           .acquireWriteLock(BUCKET_LOCK, volumeName, bucketName));
       acquiredLock = getOmLockDetails().isLockAcquired();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
index 402e2451c3..17c9f6bfe5 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequest.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -67,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.framework;
 import static org.mockito.Mockito.mock;
@@ -104,6 +106,10 @@ public class TestOMDirectoryCreateRequest {
         .thenReturn(new ResolvedBucket("", "",
             "", "", "",
             BucketLayout.DEFAULT));
+    OMLayoutVersionManager lvm = mock(OMLayoutVersionManager.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(lvm.isAllowed(anyString())).thenReturn(true);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
   }
 
   @AfterEach
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
index a914ae4e16..82cb3c0d9b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMDirectoryCreateRequestWithFSO.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
@@ -66,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.framework;
 import static org.mockito.Mockito.mock;
@@ -104,6 +106,10 @@ public class TestOMDirectoryCreateRequestWithFSO {
             .thenReturn(new ResolvedBucket("", "",
                     "", "", "",
                     BucketLayout.DEFAULT));
+    OMLayoutVersionManager lvm = mock(OMLayoutVersionManager.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(lvm.isAllowed(anyString())).thenReturn(true);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
   }
 
   @AfterEach
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
index 50c944a80a..cb585caefd 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequest.java
@@ -28,6 +28,7 @@ import java.util.UUID;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -48,6 +49,11 @@ public class TestOMKeyDeleteRequest extends TestOMKeyRequest 
{
   @ParameterizedTest
   @ValueSource(strings = {"keyName", "a/b/keyName", "a/.snapshot/keyName", 
"a.snapshot/b/keyName"})
   public void testPreExecute(String testKeyName) throws Exception {
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, 
omMetadataManager, getBucketLayout());
+    String ozoneKey = addKeyToTable(testKeyName);
+    OmKeyInfo omKeyInfo = 
omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);
+    Assertions.assertNotNull(omKeyInfo);
+
     doPreExecute(createDeleteKeyRequest(testKeyName));
   }
 
@@ -99,18 +105,15 @@ public class TestOMKeyDeleteRequest extends 
TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCacheWithKeyNotFound() throws Exception {
-    OMRequest modifiedOmRequest =
-        doPreExecute(createDeleteKeyRequest());
-
-    OMKeyDeleteRequest omKeyDeleteRequest =
-            getOmKeyDeleteRequest(modifiedOmRequest);
-
     // Add only volume and bucket entry to DB.
     // In actual implementation we don't check for bucket/volume exists
     // during delete key.
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager, getBucketLayout());
 
+    OMKeyDeleteRequest omKeyDeleteRequest =
+        getOmKeyDeleteRequest(createDeleteKeyRequest());
+
     OMClientResponse omClientResponse =
         omKeyDeleteRequest.validateAndUpdateCache(ozoneManager, 100L);
 
@@ -120,11 +123,9 @@ public class TestOMKeyDeleteRequest extends 
TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception {
-    OMRequest modifiedOmRequest =
-        doPreExecute(createDeleteKeyRequest());
 
-    OMKeyDeleteRequest omKeyDeleteRequest =
-            getOmKeyDeleteRequest(modifiedOmRequest);
+    OMKeyDeleteRequest omKeyDeleteRequest = getOmKeyDeleteRequest(
+        createDeleteKeyRequest());
 
     OMClientResponse omClientResponse = omKeyDeleteRequest
         .validateAndUpdateCache(ozoneManager, 100L);
@@ -135,11 +136,8 @@ public class TestOMKeyDeleteRequest extends 
TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
-    OMRequest modifiedOmRequest =
-        doPreExecute(createDeleteKeyRequest());
-
     OMKeyDeleteRequest omKeyDeleteRequest =
-            getOmKeyDeleteRequest(modifiedOmRequest);
+            getOmKeyDeleteRequest(createDeleteKeyRequest());
 
     OMRequestTestUtils.addVolumeToDB(volumeName, omMetadataManager);
 
@@ -156,7 +154,7 @@ public class TestOMKeyDeleteRequest extends 
TestOMKeyRequest {
    * @return OMRequest - modified request returned from preExecute.
    * @throws Exception
    */
-  private OMRequest doPreExecute(OMRequest originalOmRequest) throws Exception 
{
+  protected OMRequest doPreExecute(OMRequest originalOmRequest) throws 
Exception {
 
     OMKeyDeleteRequest omKeyDeleteRequest =
             getOmKeyDeleteRequest(originalOmRequest);
@@ -173,7 +171,7 @@ public class TestOMKeyDeleteRequest extends 
TestOMKeyRequest {
    * Create OMRequest which encapsulates DeleteKeyRequest.
    * @return OMRequest
    */
-  private OMRequest createDeleteKeyRequest() {
+  protected OMRequest createDeleteKeyRequest() {
     return createDeleteKeyRequest(keyName);
   }
 
@@ -190,12 +188,15 @@ public class TestOMKeyDeleteRequest extends 
TestOMKeyRequest {
   }
 
   protected String addKeyToTable() throws Exception {
+    return addKeyToTable(keyName);
+  }
+
+  protected String addKeyToTable(String key) throws Exception {
     OMRequestTestUtils.addKeyToTable(false, volumeName,
-            bucketName, keyName, clientID, replicationType, replicationFactor,
+            bucketName, key, clientID, replicationType, replicationFactor,
             omMetadataManager);
 
-    return omMetadataManager.getOzoneKey(volumeName, bucketName,
-            keyName);
+    return omMetadataManager.getOzoneKey(volumeName, bucketName, key);
   }
 
   protected OMKeyDeleteRequest getOmKeyDeleteRequest(
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
index e9a735dd89..96483fb587 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyDeleteRequestWithFSO.java
@@ -33,7 +33,10 @@ import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.security.acl.OzonePrefixPath;
 import org.apache.hadoop.util.Time;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -102,6 +105,18 @@ public class TestOMKeyDeleteRequestWithFSO extends 
TestOMKeyDeleteRequest {
     return omKeyInfo.getPath();
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"keyName"})
+  @Override
+  public void testPreExecute(String testKeyName) throws Exception {
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucketName, 
omMetadataManager, getBucketLayout());
+    String ozoneKey = addKeyToTable();
+    OmKeyInfo omKeyInfo = 
omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);
+    Assertions.assertNotNull(omKeyInfo);
+
+    doPreExecute(createDeleteKeyRequest());
+  }
+
   @Test
   public void testOzonePrefixPathViewer() throws Exception {
     // Add volume, bucket and key entries to OM DB.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequest.java
index 2056913bff..a6015870d0 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequest.java
@@ -63,15 +63,16 @@ public class TestOMKeyRenameRequest extends 
TestOMKeyRequest {
 
   @Test
   public void testPreExecute() throws Exception {
+    addKeyToTable(fromKeyInfo);
     doPreExecute(createRenameKeyRequest(
         volumeName, bucketName, fromKeyName, toKeyName));
   }
 
   @Test
   public void testValidateAndUpdateCache() throws Exception {
+    String dbFromKey = addKeyToTable(fromKeyInfo);
     OMRequest modifiedOmRequest = doPreExecute(createRenameKeyRequest(
             volumeName, bucketName, fromKeyName, toKeyName));
-    String dbFromKey = addKeyToTable(fromKeyInfo);
 
     OMKeyRenameRequest omKeyRenameRequest =
             getOMKeyRenameRequest(modifiedOmRequest);
@@ -97,8 +98,8 @@ public class TestOMKeyRenameRequest extends TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCacheWithKeyNotFound() throws Exception {
-    OMRequest modifiedOmRequest = doPreExecute(createRenameKeyRequest(
-        volumeName, bucketName, fromKeyName, toKeyName));
+    OMRequest omRequest = createRenameKeyRequest(
+        volumeName, bucketName, fromKeyName, toKeyName);
 
     // Add only volume and bucket entry to DB.
 
@@ -108,7 +109,7 @@ public class TestOMKeyRenameRequest extends 
TestOMKeyRequest {
         omMetadataManager);
 
     OMKeyRenameRequest omKeyRenameRequest =
-        new OMKeyRenameRequest(modifiedOmRequest, getBucketLayout());
+        new OMKeyRenameRequest(omRequest, getBucketLayout());
 
     OMClientResponse omKeyRenameResponse =
         omKeyRenameRequest.validateAndUpdateCache(ozoneManager, 100L);
@@ -119,11 +120,11 @@ public class TestOMKeyRenameRequest extends 
TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCacheWithVolumeNotFound() throws Exception {
-    OMRequest modifiedOmRequest = doPreExecute(createRenameKeyRequest(
-        "not_exist_volume", "not_exist_bucket", fromKeyName, toKeyName));
+    OMRequest omRequest = createRenameKeyRequest(
+        "not_exist_volume", "not_exist_bucket", fromKeyName, toKeyName);
 
     OMKeyRenameRequest omKeyRenameRequest =
-        new OMKeyRenameRequest(modifiedOmRequest, getBucketLayout());
+        new OMKeyRenameRequest(omRequest, getBucketLayout());
 
     OMClientResponse omKeyRenameResponse =
         omKeyRenameRequest.validateAndUpdateCache(ozoneManager, 100L);
@@ -134,14 +135,14 @@ public class TestOMKeyRenameRequest extends 
TestOMKeyRequest {
 
   @Test
   public void testValidateAndUpdateCacheWithBucketNotFound() throws Exception {
-    OMRequest modifiedOmRequest = doPreExecute(createRenameKeyRequest(
-        volumeName, "not_exist_bucket", fromKeyName, toKeyName));
+    OMRequest omRequest = createRenameKeyRequest(
+        volumeName, "not_exist_bucket", fromKeyName, toKeyName);
 
     // Add only volume entry to DB.
     OMRequestTestUtils.addVolumeToDB(volumeName, omMetadataManager);
 
     OMKeyRenameRequest omKeyRenameRequest =
-        new OMKeyRenameRequest(modifiedOmRequest, getBucketLayout());
+        new OMKeyRenameRequest(omRequest, getBucketLayout());
 
     OMClientResponse omKeyRenameResponse =
         omKeyRenameRequest.validateAndUpdateCache(ozoneManager, 100L);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequestWithFSO.java
index a1d7c83efd..c91b8e1582 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRenameRequestWithFSO.java
@@ -105,8 +105,8 @@ public class TestOMKeyRenameRequestWithFSO extends 
TestOMKeyRenameRequest {
   @Test
   public void testValidateAndUpdateCacheWithEmptyToKey() throws Exception {
     String emptyToKeyName = "";
-    OMRequest omRequest = doPreExecute(createRenameKeyRequest(volumeName,
-        bucketName, fromKeyName, emptyToKeyName));
+    OMRequest omRequest = createRenameKeyRequest(volumeName,
+        bucketName, fromKeyName, emptyToKeyName);
     assertEquals(omRequest.getRenameKeyRequest().getToKeyName(), "");
   }
 
@@ -121,14 +121,13 @@ public class TestOMKeyRenameRequestWithFSO extends 
TestOMKeyRenameRequest {
 
   @Test
   public void testPreExecuteWithUnNormalizedPath() throws Exception {
+    addKeyToTable(fromKeyInfo);
     String toKeyName =
         "///root" + OzoneConsts.OZONE_URI_DELIMITER +
             OzoneConsts.OZONE_URI_DELIMITER +
             UUID.randomUUID();
     String fromKeyName =
-        "///root/sub-dir" + OzoneConsts.OZONE_URI_DELIMITER +
-            OzoneConsts.OZONE_URI_DELIMITER +
-            UUID.randomUUID();
+        "///" + fromKeyInfo.getKeyName();
     OMRequest modifiedOmRequest =
         doPreExecute(createRenameKeyRequest(toKeyName, fromKeyName));
     String normalizedSrcName =
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index b949882abb..1e0c421333 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -33,12 +33,9 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.ResolvedBucket;
-import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.lock.OzoneLockProvider;
-import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
@@ -72,7 +69,6 @@ import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.times;
@@ -122,14 +118,6 @@ public class TestCleanupTableInfo {
     OMMetadataManager metaMgr = createOMMetadataManagerSpy();
     when(om.getMetrics()).thenReturn(omMetrics);
     when(om.getMetadataManager()).thenReturn(metaMgr);
-    when(om.resolveBucketLink(any(KeyArgs.class), any(OMClientRequest.class)))
-        .thenAnswer(
-            invocationOnMock -> new ResolvedBucket(
-                TEST_VOLUME_NAME, TEST_BUCKET_NAME,
-                TEST_VOLUME_NAME, TEST_BUCKET_NAME,
-                "owner", BucketLayout.DEFAULT)
-        );
-    when(om.getAclsEnabled()).thenReturn(false);
     when(om.getAuditLogger()).thenReturn(mock(AuditLogger.class));
     when(om.getDefaultReplicationConfig()).thenReturn(ReplicationConfig
         .getDefault(new OzoneConfiguration()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to