This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 6fa5fa1cd09abdba88c996e4adadcc35bca966e4
Author: avijayanhwx <14299376+avijayan...@users.noreply.github.com>
AuthorDate: Tue Sep 15 10:40:24 2020 -0700

    HDDS-4143. Implement a factory for OM Requests that returns an instance 
based on layout version. (#1405)
---
 .../upgrade/LayoutVersionInstanceFactory.java      | 247 +++++++++++++++++++++
 .../hadoop/ozone/upgrade/VersionFactoryKey.java    |  70 ++++++
 .../upgrade/TestLayoutVersionInstanceFactory.java  | 191 ++++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |   5 +
 hadoop-ozone/ozone-manager/pom.xml                 |   1 -
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  11 +-
 .../om/ratis/utils/OzoneManagerRatisUtils.java     | 186 ++++++----------
 .../hadoop/ozone/om/request/OMClientRequest.java   |   8 +-
 .../om/request/bucket/OMBucketCreateRequest.java   |   4 +
 .../om/request/bucket/OMBucketDeleteRequest.java   |   5 +
 .../request/bucket/OMBucketSetPropertyRequest.java |   5 +
 .../request/bucket/acl/OMBucketAddAclRequest.java  |   6 +
 .../bucket/acl/OMBucketRemoveAclRequest.java       |   7 +
 .../request/bucket/acl/OMBucketSetAclRequest.java  |   7 +
 .../om/request/file/OMDirectoryCreateRequest.java  |   5 +
 .../ozone/om/request/file/OMFileCreateRequest.java |  10 +-
 .../om/request/key/OMAllocateBlockRequest.java     |   5 +
 .../ozone/om/request/key/OMECKeyCreateRequest.java |  54 +++++
 .../ozone/om/request/key/OMKeyCommitRequest.java   |   5 +
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   4 +
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |   5 +
 .../ozone/om/request/key/OMKeyPurgeRequest.java    |   6 +
 .../ozone/om/request/key/OMKeyRenameRequest.java   |   5 +
 .../ozone/om/request/key/OMKeysDeleteRequest.java  |   5 +
 .../ozone/om/request/key/OMKeysRenameRequest.java  |   5 +
 .../om/request/key/OMTrashRecoverRequest.java      |   4 +
 .../om/request/key/acl/OMKeyAddAclRequest.java     |   7 +
 .../om/request/key/acl/OMKeyRemoveAclRequest.java  |   7 +
 .../om/request/key/acl/OMKeySetAclRequest.java     |   7 +
 .../key/acl/prefix/OMPrefixAddAclRequest.java      |   7 +
 .../key/acl/prefix/OMPrefixRemoveAclRequest.java   |   7 +
 .../key/acl/prefix/OMPrefixSetAclRequest.java      |   7 +
 .../S3InitiateMultipartUploadRequest.java          |   5 +
 .../multipart/S3MultipartUploadAbortRequest.java   |   5 +
 .../S3MultipartUploadCommitPartRequest.java        |   4 +
 .../S3MultipartUploadCompleteRequest.java          |   6 +
 .../om/request/s3/security/S3GetSecretRequest.java |   5 +
 .../security/OMCancelDelegationTokenRequest.java   |   7 +-
 .../security/OMGetDelegationTokenRequest.java      |   7 +-
 .../security/OMRenewDelegationTokenRequest.java    |   6 +
 .../upgrade/OMFinalizeUpgradeProgressRequest.java  |   6 +
 .../request/upgrade/OMFinalizeUpgradeRequest.java  |   9 +-
 .../om/request/volume/OMVolumeCreateRequest.java   |   5 +
 .../om/request/volume/OMVolumeDeleteRequest.java   |   6 +
 .../om/request/volume/OMVolumeSetOwnerRequest.java |   5 +
 .../om/request/volume/OMVolumeSetQuotaRequest.java |   5 +
 .../request/volume/acl/OMVolumeAddAclRequest.java  |   7 +
 .../volume/acl/OMVolumeRemoveAclRequest.java       |   7 +
 .../request/volume/acl/OMVolumeSetAclRequest.java  |   7 +
 ...FeatureAPI.java => BelongsToLayoutVersion.java} |  14 +-
 ...eAPI.java => DisallowedUntilLayoutVersion.java} |   8 +-
 .../hadoop/ozone/om/upgrade/OMLayoutFeature.java   |  63 ++++++
 .../ozone/om/upgrade/OMLayoutFeatureAspect.java    |  20 +-
 .../ozone/om/upgrade/OMLayoutFeatureCatalog.java   |  93 --------
 ...outFeatureAPI.java => OMLayoutFeatureUtil.java} |  36 ++-
 .../ozone/om/upgrade/OMLayoutVersionManager.java   |  92 --------
 .../om/upgrade/OMLayoutVersionManagerImpl.java     | 177 +++++++++++++++
 ...FeatureAPI.java => OmLayoutVersionManager.java} |  16 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   9 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |  12 +-
 .../ozone/om/request/bucket/TestBucketRequest.java |   5 +
 .../ozone/om/request/key/TestOMKeyRequest.java     |   5 +
 .../om/request/volume/TestOMVolumeRequest.java     |   9 +-
 .../om/upgrade/TestOMLayoutFeatureAspect.java      |   4 +-
 .../ozone/om/upgrade/TestOMVersionManager.java     |  49 +++-
 .../TestOmVersionManagerRequestFactory.java        | 122 ++++++++++
 hadoop-ozone/s3gateway/pom.xml                     |   5 +
 67 files changed, 1378 insertions(+), 371 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
new file mode 100644
index 0000000..96463e0
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.stream.Collectors.toList;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Generic factory which stores different instances of Type 'T' sharded by
+ * a key & version. A single key can be associated with different versions
+ * of 'T'.
+ *
+ * Why does this class exist?
+ * A typical use case during upgrade is to have multiple versions of a class
+ * / method / object and chose them based  on current layout
+ * version at runtime. Before finalizing, an older version is typically
+ * needed, and after finalize, a newer version is needed. This class serves
+ * this purpose in a generic way.
+ *
+ * For example, we can create a Factory to create multiple versions of
+ * OMRequests sharded by Request Type & Layout Version Supported.
+ */
+public class LayoutVersionInstanceFactory<T> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LayoutVersionInstanceFactory.class);
+
+  /**
+   * The factory will maintain ALL instances > MLV and 1 instance <= MLV in a
+   * priority queue (ordered by version). By doing that it guarantees O(1)
+   * lookup at all times, since we always would lookup the first element (top
+   * of the PQ).
+   * Multiple entries will be there ONLY during pre-finalized state.
+   * On finalization, we will be removing the entry one by one until we reach
+   * a single entry. On a regular component instance (finalized), there will
+   * be a single request version associated with a request always.
+   */
+  private final Map<String, PriorityQueue<VersionedInstance<T>>> instances =
+      new HashMap<>();
+
+  /**
+   * Register an instance with a given factory key (key + version).
+   * For safety reasons we dont allow (1) re-registering, (2) registering an
+   * instance with version > SLV.
+   *
+   * @param lvm LayoutVersionManager
+   * @param key VersionFactoryKey key to associate with instance.
+   * @param instance instance to register.
+   */
+  public boolean register(LayoutVersionManager lvm, VersionFactoryKey key,
+                       T instance) {
+    // If version is not passed in, go defensive and set the highest possible
+    // version (SLV).
+    int version = key.getVersion() == null ?
+        lvm.getSoftwareLayoutVersion() : key.getVersion();
+
+    checkArgument(lvm.getSoftwareLayoutVersion() >= key.getVersion(),
+        String.format("Cannot register key %s since the version is greater " +
+                "than the Software layout version %d",
+        key, lvm.getSoftwareLayoutVersion()));
+
+    // If we reach here, we know that the passed in version belongs to
+    // [0, SLV].
+    String primaryKey = key.getKey();
+    instances.computeIfAbsent(primaryKey, s ->
+        new PriorityQueue<>(Comparator.comparingInt(o -> o.version)));
+
+    PriorityQueue<VersionedInstance<T>> versionedInstances =
+        instances.get(primaryKey);
+    Optional<VersionedInstance<T>> existingInstance =
+        versionedInstances.parallelStream()
+        .filter(v -> v.version == key.getVersion()).findAny();
+
+    if (existingInstance.isPresent()) {
+      throw new IllegalArgumentException(String.format("Cannot register key " +
+          "%s since there is an existing entry already.", key));
+    }
+
+    if (!versionedInstances.isEmpty() && isValid(lvm, version)) {
+      VersionedInstance<T> currentPeek = versionedInstances.peek();
+      if (currentPeek.version < version) {
+        // Current peek < passed in version (and <= MLV). Hence, we can
+        // remove it, since the passed in a better candidate.
+        versionedInstances.poll();
+        // Add the passed in instance.
+        versionedInstances.offer(new VersionedInstance<>(version, instance));
+        return true;
+      } else if (currentPeek.version > lvm.getMetadataLayoutVersion()) {
+        // Current peak is > MLV, hence we don't need to remove that. Just
+        // add passed in instance.
+        versionedInstances.offer(new VersionedInstance<>(version, instance));
+        return true;
+      } else {
+        // Current peek <= MLV and > passed in version, and hence a better
+        // canidate. Retaining the peek, and ignoring the passed in instance.
+        return false;
+      }
+    } else {
+      // Passed in instance version > MLV (or the first version to be
+      // registered), hence can be registered.
+      versionedInstances.offer(new VersionedInstance<>(version, instance));
+      return true;
+    }
+  }
+
+  private boolean isValid(LayoutVersionManager lvm, int version) {
+    return version <= lvm.getMetadataLayoutVersion();
+  }
+
+  /**
+   * From the list of versioned instances for a given "key", this
+   * returns the "floor" value corresponding to the given version.
+   * For example, if we have key = "CreateKey",  entry -> [(1, CreateKeyV1),
+   * (3, CreateKeyV2), and if the passed in key = CreateKey & version = 2, we
+   * return CreateKeyV1.
+   * Since this is a priority queue based implementation, we use a O(1) peek()
+   * lookup to get the current valid version.
+   * @param lvm LayoutVersionManager
+   * @param key Key and Version.
+   * @return instance.
+   */
+  public T get(LayoutVersionManager lvm, VersionFactoryKey key) {
+    Integer version = key.getVersion();
+    // If version is not passed in, go defensive and set the highest allowed
+    // version (MLV).
+    if (version == null) {
+      version = lvm.getMetadataLayoutVersion();
+    }
+
+    checkArgument(lvm.getMetadataLayoutVersion() >= version,
+        String.format("Cannot get key %s since the version is greater " +
+                "than the Metadata layout version %d",
+            key, lvm.getMetadataLayoutVersion()));
+
+    String primaryKey = key.getKey();
+    PriorityQueue<VersionedInstance<T>> versionedInstances =
+        instances.get(primaryKey);
+    if (versionedInstances == null || versionedInstances.isEmpty()) {
+      throw new IllegalArgumentException(
+          "No suitable instance found for request : " + key);
+    }
+
+    VersionedInstance<T> value = versionedInstances.peek();
+    if (value == null || value.version > version) {
+      throw new IllegalArgumentException(
+          "No suitable instance found for request : " + key);
+    } else {
+      return value.instance;
+    }
+  }
+
+  /**
+   * To be called on finalization when there is an MLV update.
+   * @param lvm LayoutVersionManager instance.
+   */
+  public void onFinalize(LayoutVersionManager lvm) {
+    Iterator<Map.Entry<String, PriorityQueue<VersionedInstance<T>>>> iterator =
+        instances.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<String, PriorityQueue<VersionedInstance<T>>> next =
+          iterator.next();
+      PriorityQueue<VersionedInstance<T>> vInstances = next.getValue();
+      VersionedInstance<T> prevInstance = null;
+      while (!vInstances.isEmpty() &&
+          vInstances.peek().version < lvm.getMetadataLayoutVersion()) {
+        prevInstance = vInstances.poll();
+        LOG.info("Unregistering {} from factory. ", prevInstance.instance);
+      }
+
+      if ((vInstances.isEmpty() ||
+          vInstances.peek().version > lvm.getMetadataLayoutVersion())
+          && prevInstance != null) {
+        vInstances.offer(prevInstance);
+      }
+
+      if (vInstances.isEmpty()) {
+        LOG.info("Unregistering '{}' from factory since it has no entries.",
+            next.getKey());
+        iterator.remove();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  protected Map<String, List<T>> getInstances()  {
+    Map<String, List<T>> instancesCopy = new HashMap<>();
+    instances.forEach((key, value) -> {
+      List<T> collect =
+          value.stream().map(v -> v.instance).collect(toList());
+      instancesCopy.put(key, collect);
+    });
+    return Collections.unmodifiableMap(instancesCopy);
+  }
+
+  /**
+   * Class to encapsulate a instance with version. Not meant to be exposed
+   * outside this class.
+   * @param <T> instance
+   */
+  static class VersionedInstance<T> {
+    private int version;
+    private T instance;
+
+    VersionedInstance(int version, T instance) {
+      this.version = version;
+      this.instance = instance;
+    }
+
+    public long getVersion() {
+      return version;
+    }
+
+    public T getInstance() {
+      return instance;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/VersionFactoryKey.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/VersionFactoryKey.java
new file mode 100644
index 0000000..bda45f5
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/VersionFactoryKey.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+/**
+ * "Key" element to the Version specific instance factory. Currently it has 2
+ * dimensions -> a 'key' string and a version. This is to support a factory
+ * which returns an instance for a given "key" and "version".
+ */
+public class VersionFactoryKey {
+  private String key;
+  private Integer version;
+
+  public VersionFactoryKey(String key, Integer version) {
+    this.key = key;
+    this.version = version;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public Integer getVersion() {
+    return version;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + " : [" + key + ", "
+        + version  + "]";
+  }
+
+  /**
+   * Builder for above key.
+   */
+  public static class Builder {
+    private String key;
+    private Integer version;
+
+    public Builder key(String k) {
+      this.key = k;
+      return this;
+    }
+
+    public Builder version(Integer v) {
+      this.version = v;
+      return this;
+    }
+
+    public VersionFactoryKey build() {
+      return new VersionFactoryKey(key, version);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
new file mode 100644
index 0000000..e0bb185
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.function.Supplier;
+
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Test;
+
+/**
+ * Test out APIs of VersionSpecificInstanceFactory.
+ */
+public class TestLayoutVersionInstanceFactory {
+
+  private MockInterface m1 = new MockClassV1();
+  private MockInterface m2 = new MockClassV2();
+
+
+  @Test
+  public void testRegister() throws Exception {
+    LayoutVersionManager lvm = getMockLvm(1, 2);
+    LayoutVersionInstanceFactory<MockInterface> factory =
+        new LayoutVersionInstanceFactory<>();
+
+    assertTrue(factory.register(lvm, getKey("key", 0), m1));
+    assertTrue(factory.register(lvm, getKey("key", 1), m1));
+    assertTrue(factory.register(lvm, getKey("key", 2), m2));
+
+    assertEquals(1, factory.getInstances().size());
+    assertEquals(2, factory.getInstances().get("key").size());
+
+    // Should fail on re-registration.
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "existing entry already",
+        () -> factory.register(lvm, getKey("key", 1), new MockClassV1()));
+    assertEquals(1, factory.getInstances().size());
+
+    // Verify SLV check.
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "version is greater",
+        () -> factory.register(lvm, getKey("key2", 4), new MockClassV2()));
+
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    LayoutVersionManager lvm = getMockLvm(2, 3);
+    LayoutVersionInstanceFactory<MockInterface> factory =
+        new LayoutVersionInstanceFactory<>();
+    assertTrue(factory.register(lvm, getKey("key", 0), null));
+    assertTrue(factory.register(lvm, getKey("key", 1), m1));
+    assertTrue(factory.register(lvm, getKey("key", 3), m2));
+
+    MockInterface val = factory.get(lvm, getKey("key", 2));
+    assertTrue(val instanceof MockClassV1);
+
+    // Not passing in version --> Use MLV.
+    val = factory.get(lvm, getKey("key", null));
+    assertTrue(val instanceof MockClassV1);
+
+    // MLV check.
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "version is greater",
+        () -> factory.get(lvm, getKey("key", 3)));
+
+    // Verify failure on Unknown request.
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "No suitable instance found",
+        () -> factory.get(lvm, getKey("key1", 1)));
+  }
+
+  @Test
+  public void testMethodBasedVersionFactory() {
+    LayoutVersionManager lvm = getMockLvm(1, 2);
+    LayoutVersionInstanceFactory<Supplier<String>> factory =
+        new LayoutVersionInstanceFactory<>();
+
+    MockClassWithVersionedAPIs m = new MockClassWithVersionedAPIs();
+    factory.register(lvm, getKey("method", 1), m::mockMethodV1);
+    factory.register(lvm, getKey("method", 2), m::mockMethodV2);
+
+    Supplier<String> method = factory.get(lvm, getKey("method", 1));
+    assertEquals("v1", method.get());
+  }
+
+
+  private VersionFactoryKey getKey(String key, Integer version) {
+    VersionFactoryKey.Builder vfKey = new VersionFactoryKey.Builder().key(key);
+    if (version != null) {
+      vfKey.version(version);
+    }
+    return vfKey.build();
+  }
+
+
+
+  @Test
+  public void testOnFinalize() {
+    LayoutVersionManager lvm = getMockLvm(1, 3);
+    LayoutVersionInstanceFactory<MockInterface> factory =
+        new LayoutVersionInstanceFactory<>();
+    assertTrue(factory.register(lvm, getKey("key", 1), m1));
+    assertTrue(factory.register(lvm, getKey("key", 3), m2));
+    assertTrue(factory.register(lvm, getKey("key2", 1), m1));
+    assertTrue(factory.register(lvm, getKey("key2", 2), m2));
+
+    MockInterface val = factory.get(lvm, getKey("key", null));
+    assertTrue(val instanceof MockClassV1);
+    assertEquals(2, factory.getInstances().size());
+    assertEquals(2, factory.getInstances().get("key").size());
+
+    val = factory.get(lvm, getKey("key2", null));
+    assertTrue(val instanceof MockClassV1);
+
+    // Finalize the layout version.
+    lvm = getMockLvm(3, 3);
+    factory.onFinalize(lvm);
+
+    val = factory.get(lvm, getKey("key", null));
+    assertTrue(val instanceof MockClassV2);
+    assertEquals(2, factory.getInstances().size());
+    assertEquals(1, factory.getInstances().get("key").size());
+
+    val = factory.get(lvm, getKey("key2", null));
+    assertTrue(val instanceof MockClassV2);
+  }
+
+  private LayoutVersionManager getMockLvm(int mlv, int slv) {
+    LayoutVersionManager lvm = mock(LayoutVersionManager.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(mlv);
+    when(lvm.getSoftwareLayoutVersion()).thenReturn(slv);
+    return lvm;
+  }
+
+  /**
+   * Mock Interface.
+   */
+  interface MockInterface {
+    String mockMethod();
+  }
+
+  /**
+   * Mock Impl v1.
+   */
+  static class MockClassV1 implements MockInterface {
+    @Override
+    public String mockMethod() {
+      return getClass().getSimpleName();
+    }
+  }
+
+  /**
+   * Mock Impl v2.
+   */
+  static class MockClassV2 extends MockClassV1 {
+  }
+
+  /**
+   * Mock class with a v1 and v2 method.
+   */
+  static class MockClassWithVersionedAPIs {
+    public String mockMethodV1() {
+      return "v1";
+    }
+
+    public String mockMethodV2() {
+      return "v2";
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d1e2971..a2a322f 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -107,6 +107,7 @@ message OMRequest {
 
   optional UserInfo userInfo = 4;
 
+  optional LayoutVersion layoutVersion = 5;
 
   optional CreateVolumeRequest              createVolumeRequest            = 
11;
   optional SetVolumePropertyRequest         setVolumePropertyRequest       = 
12;
@@ -1210,6 +1211,10 @@ message GetS3SecretResponse {
     required S3Secret s3Secret = 2;
 }
 
+message LayoutVersion {
+    required uint64 version = 1;
+}
+
 /**
   This will be used internally by OM to replicate S3 Secret across quorum of
   OM's.
diff --git a/hadoop-ozone/ozone-manager/pom.xml 
b/hadoop-ozone/ozone-manager/pom.xml
index 4c9a901..895afcb 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -131,7 +131,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <groupId>org.reflections</groupId>
       <artifactId>reflections</artifactId>
       <version>0.9.12</version>
-      <scope>test</scope>
     </dependency>
 
   </dependencies>
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 1809827..b7a6b4f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -142,7 +142,8 @@ import 
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
-import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManagerImpl;
+import org.apache.hadoop.ozone.om.upgrade.OmLayoutVersionManager;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
@@ -312,6 +313,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private KeyProviderCryptoExtension kmsProvider = null;
   private static String keyProviderUriKeyName =
       CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+  private final OMLayoutVersionManagerImpl versionManager;
 
   private boolean allowListAllVolumes;
   // Adding parameters needed for VolumeRequests here, so that during request
@@ -352,6 +354,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     omStorage = new OMStorage(conf);
     omId = omStorage.getOmId();
 
+    versionManager = OMLayoutVersionManagerImpl.initialize(omStorage);
+
     // In case of single OM Node Service there will be no OM Node ID
     // specified, set it to value from om storage
     if (this.omNodeDetails.getOMNodeId() == null) {
@@ -1144,8 +1148,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     metadataManager.start(configuration);
     startSecretManagerIfNecessary();
 
-    OMLayoutVersionManager omVersionManager =
-        OMLayoutVersionManager.initialize(omStorage);
 
     if (certClient != null) {
       caCertPem = CertificateCodec.getPEMEncodedString(
@@ -3732,4 +3734,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
 
   }
 
+  public OmLayoutVersionManager getVersionManager() {
+    return versionManager;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index f43dfba..f0117b5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.om.ratis.utils;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -25,42 +26,16 @@ import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
-import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest;
-import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest;
-import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetPropertyRequest;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketAddAclRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest;
-import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
-import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeysDeleteRequest;
-import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
-import org.apache.hadoop.ozone.om.request.key.OMKeysRenameRequest;
-import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixAddAclRequest;
 import 
org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
-import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
-import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
-import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
-import 
org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest;
-import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
-import 
org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
-import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
-import 
org.apache.hadoop.ozone.om.request.security.OMRenewDelegationTokenRequest;
-import 
org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeProgressRequest;
-import org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeRequest;
-import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
-import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetQuotaRequest;
 import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeAddAclRequest;
@@ -73,6 +48,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.ratis.util.FileUtils;
 import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Path;
@@ -85,131 +62,110 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.TRANSACTION_INFO_
  */
 public final class OzoneManagerRatisUtils {
 
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OzoneManagerRatisUtils.class);
+
   private OzoneManagerRatisUtils() {
   }
-  /**
-   * Create OMClientRequest which encapsulates the OMRequest.
-   * @param omRequest
-   * @return OMClientRequest
-   * @throws IOException
-   */
-  public static OMClientRequest createClientRequest(OMRequest omRequest) {
+
+  public static OMClientRequest getRequest(OzoneManager om,
+                                           OMRequest omRequest) {
     Type cmdType = omRequest.getCmdType();
     switch (cmdType) {
-    case CreateVolume:
-      return new OMVolumeCreateRequest(omRequest);
-    case SetVolumeProperty:
-      boolean hasQuota = omRequest.getSetVolumePropertyRequest()
-          .hasQuotaInBytes();
-      boolean hasOwner = 
omRequest.getSetVolumePropertyRequest().hasOwnerName();
-      Preconditions.checkState(hasOwner || hasQuota, "Either Quota or owner " +
-          "should be set in the SetVolumeProperty request");
-      Preconditions.checkState(!(hasOwner && hasQuota), "Either Quota or " +
-          "owner should be set in the SetVolumeProperty request. Should not " +
-          "set both");
-      if (hasQuota) {
-        return new OMVolumeSetQuotaRequest(omRequest);
-      } else {
-        return new OMVolumeSetOwnerRequest(omRequest);
-      }
-    case DeleteVolume:
-      return new OMVolumeDeleteRequest(omRequest);
-    case CreateBucket:
-      return new OMBucketCreateRequest(omRequest);
-    case DeleteBucket:
-      return new OMBucketDeleteRequest(omRequest);
-    case SetBucketProperty:
-      return new OMBucketSetPropertyRequest(omRequest);
-    case AllocateBlock:
-      return new OMAllocateBlockRequest(omRequest);
-    case CreateKey:
-      return new OMKeyCreateRequest(omRequest);
-    case CommitKey:
-      return new OMKeyCommitRequest(omRequest);
-    case DeleteKey:
-      return new OMKeyDeleteRequest(omRequest);
-    case DeleteKeys:
-      return new OMKeysDeleteRequest(omRequest);
-    case RenameKey:
-      return new OMKeyRenameRequest(omRequest);
-    case RenameKeys:
-      return new OMKeysRenameRequest(omRequest);
-    case CreateDirectory:
-      return new OMDirectoryCreateRequest(omRequest);
-    case CreateFile:
-      return new OMFileCreateRequest(omRequest);
-    case PurgeKeys:
-      return new OMKeyPurgeRequest(omRequest);
-    case InitiateMultiPartUpload:
-      return new S3InitiateMultipartUploadRequest(omRequest);
-    case CommitMultiPartUpload:
-      return new S3MultipartUploadCommitPartRequest(omRequest);
-    case AbortMultiPartUpload:
-      return new S3MultipartUploadAbortRequest(omRequest);
-    case CompleteMultiPartUpload:
-      return new S3MultipartUploadCompleteRequest(omRequest);
     case AddAcl:
     case RemoveAcl:
     case SetAcl:
-      return getOMAclRequest(omRequest);
-    case GetDelegationToken:
-      return new OMGetDelegationTokenRequest(omRequest);
-    case CancelDelegationToken:
-      return new OMCancelDelegationTokenRequest(omRequest);
-    case RenewDelegationToken:
-      return new OMRenewDelegationTokenRequest(omRequest);
-    case GetS3Secret:
-      return new S3GetSecretRequest(omRequest);
-    case RecoverTrash:
-      return new OMTrashRecoverRequest(omRequest);
-    case FinalizeUpgrade:
-      return new OMFinalizeUpgradeRequest(omRequest);
-    case FinalizeUpgradeProgress:
-      return new OMFinalizeUpgradeProgressRequest(omRequest);
+      return getOMAclRequest(om, omRequest);
+    case SetVolumeProperty:
+      return getVolumeSetPropertyRequest(om, omRequest);
     default:
-      throw new IllegalStateException("Unrecognized write command " +
-          "type request" + cmdType);
+      Class<? extends OMClientRequest> requestClass =
+          om.getVersionManager()
+              .getRequestHandler(omRequest.getCmdType().name());
+      return getClientRequest(requestClass, omRequest);
+    }
+  }
+
+  private static OMClientRequest getClientRequest(Class<?
+      extends OMClientRequest> requestClass, OMRequest omRequest) {
+    try {
+      return requestClass.getDeclaredConstructor(OMRequest.class)
+          .newInstance(omRequest);
+    } catch (Exception ex) {
+      LOG.error("Unable to get request handler for '{}', current layout " +
+              "version = {}, request factory returned '{}'",
+          omRequest.getCmdType(),
+          omRequest.getLayoutVersion().getVersion(),
+          requestClass.getSimpleName());
     }
+    throw new IllegalStateException("Unrecognized write command " +
+        "type request : " + omRequest.getCmdType());
   }
 
-  private static OMClientRequest getOMAclRequest(OMRequest omRequest) {
+  public static OMClientRequest getOMAclRequest(OzoneManager om,
+                                                OMRequest omRequest) {
     Type cmdType = omRequest.getCmdType();
+    String requestType = null;
     if (Type.AddAcl == cmdType) {
       ObjectType type = omRequest.getAddAclRequest().getObj().getResType();
       if (ObjectType.VOLUME == type) {
-        return new OMVolumeAddAclRequest(omRequest);
+        requestType = OMVolumeAddAclRequest.getRequestType();
       } else if (ObjectType.BUCKET == type) {
-        return new OMBucketAddAclRequest(omRequest);
+        requestType = OMBucketAddAclRequest.getRequestType();
       } else if (ObjectType.KEY == type) {
-        return new OMKeyAddAclRequest(omRequest);
+        requestType = OMKeyAddAclRequest.getRequestType();
       } else {
-        return new OMPrefixAddAclRequest(omRequest);
+        requestType = OMPrefixAddAclRequest.getRequestType();
       }
     } else if (Type.RemoveAcl == cmdType) {
       ObjectType type = omRequest.getRemoveAclRequest().getObj().getResType();
       if (ObjectType.VOLUME == type) {
-        return new OMVolumeRemoveAclRequest(omRequest);
+        requestType = OMVolumeRemoveAclRequest.getRequestType();
       } else if (ObjectType.BUCKET == type) {
-        return new OMBucketRemoveAclRequest(omRequest);
+        requestType = OMBucketRemoveAclRequest.getRequestType();
       } else if (ObjectType.KEY == type) {
-        return new OMKeyRemoveAclRequest(omRequest);
+        requestType = OMKeyRemoveAclRequest.getRequestType();
       } else {
-        return new OMPrefixRemoveAclRequest(omRequest);
+        requestType = OMPrefixRemoveAclRequest.getRequestType();
       }
     } else {
       ObjectType type = omRequest.getSetAclRequest().getObj().getResType();
       if (ObjectType.VOLUME == type) {
-        return new OMVolumeSetAclRequest(omRequest);
+        requestType = OMVolumeSetAclRequest.getRequestType();
       } else if (ObjectType.BUCKET == type) {
-        return new OMBucketSetAclRequest(omRequest);
+        requestType = OMBucketSetAclRequest.getRequestType();
       } else if (ObjectType.KEY == type) {
-        return new OMKeySetAclRequest(omRequest);
+        requestType = OMKeySetAclRequest.getRequestType();
       } else {
-        return new OMPrefixSetAclRequest(omRequest);
+        requestType = OMPrefixSetAclRequest.getRequestType();
       }
     }
+    Class<? extends OMClientRequest> requestClass =
+        om.getVersionManager().getRequestHandler(requestType);
+    return getClientRequest(requestClass, omRequest);
   }
 
+  public static OMClientRequest getVolumeSetPropertyRequest(
+      OzoneManager om, OMRequest omRequest) {
+    boolean hasQuota = omRequest.getSetVolumePropertyRequest()
+        .hasQuotaInBytes();
+    boolean hasOwner = omRequest.getSetVolumePropertyRequest().hasOwnerName();
+    Preconditions.checkState(hasOwner || hasQuota,
+        "Either Quota or owner " +
+            "should be set in the SetVolumeProperty request");
+    Preconditions.checkState(!(hasOwner && hasQuota),
+        "Either Quota or " +
+            "owner should be set in the SetVolumeProperty request. Should not "
+            + "set both");
+
+    String requestType = hasQuota ? OMVolumeSetQuotaRequest.getRequestType() :
+        OMVolumeSetOwnerRequest.getRequestType();
+    Class<? extends OMClientRequest> requestClass =
+        om.getVersionManager().getRequestHandler(requestType);
+    return getClientRequest(requestClass, omRequest);
+  }
+
+
   /**
    * Convert exception result to {@link OzoneManagerProtocolProtos.Status}.
    * @param exception
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 728a624..1f98d83 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LayoutVersion;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -92,7 +93,12 @@ public abstract class OMClientRequest implements 
RequestAuditor {
    */
   public OMRequest preExecute(OzoneManager ozoneManager)
       throws IOException {
-    omRequest = getOmRequest().toBuilder().setUserInfo(getUserInfo()).build();
+    LayoutVersion layoutVersion = LayoutVersion.newBuilder()
+        
.setVersion(ozoneManager.getVersionManager().getMetadataLayoutVersion())
+        .build();
+    omRequest =
+        getOmRequest().toBuilder()
+            
.setUserInfo(getUserInfo()).setLayoutVersion(layoutVersion).build();
     return omRequest;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
index fd303e7..33faa0a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java
@@ -76,6 +76,7 @@ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_L
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .CryptoProtocolVersionProto.ENCRYPTION_ZONES;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateBucket;
 
 /**
  * Handles CreateBucket Request.
@@ -335,4 +336,7 @@ public class OMBucketCreateRequest extends OMClientRequest {
 
   }
 
+  public static String getRequestType() {
+    return CreateBucket.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
index 91aef6a..33ea990 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.DeleteBucket;
 
 /**
  * Handles DeleteBucket Request.
@@ -169,4 +170,8 @@ public class OMBucketDeleteRequest extends OMClientRequest {
       return omClientResponse;
     }
   }
+
+  public static String getRequestType() {
+    return DeleteBucket.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
index 583facb..0275d7f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetBucketProperty;
 
 /**
  * Handle SetBucketProperty Request.
@@ -270,4 +271,8 @@ public class OMBucketSetPropertyRequest extends 
OMClientRequest {
     }
     return true;
   }
+
+  public static String getRequestType() {
+    return SetBucketProperty.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
index 78afeff..45cfdcf 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAddAclRequest.java
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.ozone.om.request.bucket.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.AddAcl;
+
 import java.io.IOException;
 import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.apache.hadoop.ozone.util.BooleanBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,5 +118,8 @@ public class OMBucketAddAclRequest extends 
OMBucketAclRequest {
     }
   }
 
+  public static String getRequestType() {
+    return AddAcl.name() + "-" + ObjectType.BUCKET.name();
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
index 8b6fdba..1c8af01 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketRemoveAclRequest.java
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.ozone.om.request.bucket.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RemoveAcl;
+
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,5 +115,9 @@ public class OMBucketRemoveAclRequest extends 
OMBucketAclRequest {
       }
     }
   }
+
+  public static String getRequestType() {
+    return RemoveAcl.name() + "-" + ObjectType.BUCKET.name();
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
index cfc4eb4..915342e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketSetAclRequest.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.ozone.om.request.bucket.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetAcl;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,5 +116,9 @@ public class OMBucketSetAclRequest extends 
OMBucketAclRequest {
       }
     }
   }
+
+  public static String getRequestType() {
+    return SetAcl.name() + "-" + ObjectType.BUCKET.name();
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index 7b2ab51..3ed7793 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -75,6 +75,8 @@ import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryR
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.NONE;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateDirectory;
+
 /**
  * Handle create directory request.
  */
@@ -353,4 +355,7 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
         .setUpdateID(objectId);
   }
 
+  public static String getRequestType() {
+    return CreateDirectory.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 367e4ba..a2828af 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -56,7 +56,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.util.Time;
@@ -68,6 +67,7 @@ import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_L
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateFile;
 
 /**
  * Handles create file request.
@@ -307,7 +307,7 @@ public class OMFileCreateRequest extends OMKeyRequest {
           .setKeyInfo(omKeyInfo.getProtobuf())
           .setID(clientID)
           .setOpenVersion(openVersion).build())
-          .setCmdType(Type.CreateFile);
+          .setCmdType(CreateFile);
       omClientResponse = new OMFileCreateResponse(omResponse.build(),
           omKeyInfo, missingParentInfos, clientID, omVolumeArgs, omBucketInfo);
 
@@ -316,7 +316,7 @@ public class OMFileCreateRequest extends OMKeyRequest {
       result = Result.FAILURE;
       exception = ex;
       omMetrics.incNumCreateFileFails();
-      omResponse.setCmdType(Type.CreateFile);
+      omResponse.setCmdType(CreateFile);
       omClientResponse = new OMFileCreateResponse(createErrorOMResponse(
             omResponse, exception));
     } finally {
@@ -362,4 +362,8 @@ public class OMFileCreateRequest extends OMKeyRequest {
           OMException.ResultCodes.DIRECTORY_NOT_FOUND);
     }
   }
+
+  public static String getRequestType() {
+    return CreateFile.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index afd6162..59911df 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.AllocateBlock;
 
 /**
  * Handles allocate block request.
@@ -243,4 +244,8 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return AllocateBlock.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMECKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMECKeyCreateRequest.java
new file mode 100644
index 0000000..56e7e47
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMECKeyCreateRequest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.key;
+
+import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.CREATE_EC;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.upgrade.BelongsToLayoutVersion;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+
+/**
+ * Handles Create EC Key  request. (To be removed later)
+ */
+@BelongsToLayoutVersion(CREATE_EC)
+public class OMECKeyCreateRequest extends OMKeyCreateRequest {
+
+  public OMECKeyCreateRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    // V2 impl here.
+    return null;
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long trxnLogIndex, OzoneManagerDoubleBufferHelper omDoubleBufferHelper) {
+    // V2 impl here.
+    return null;
+  }
+
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 29d0243..f246b1c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CommitKey;
 
 /**
  * Handles CommitKey request.
@@ -253,4 +254,8 @@ public class OMKeyCommitRequest extends OMKeyRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return CommitKey.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index f16153d..9e550f5 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -67,6 +67,7 @@ import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.DIRECTORY_EXISTS;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.OMDirectoryResult.FILE_EXISTS_IN_GIVENPATH;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateKey;
 
 /**
  * Handles CreateKey request.
@@ -359,4 +360,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
+  public static String getRequestType() {
+    return CreateKey.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
index e27b7e1..7d536c1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.DeleteKey;
 
 /**
  * Handles DeleteKey request.
@@ -205,4 +206,8 @@ public class OMKeyDeleteRequest extends OMKeyRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return DeleteKey.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
index ce7f1e9..51f27fe 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PurgeKeys;
+
 import java.util.ArrayList;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
@@ -71,4 +73,8 @@ public class OMKeyPurgeRequest extends OMKeyRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return PurgeKeys.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
index 4e7c05c..8243859 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RenameKey;
 
 /**
  * Handles rename key request.
@@ -236,4 +237,8 @@ public class OMKeyRenameRequest extends OMKeyRequest {
     auditMap.put(OzoneConsts.DST_KEY, renameKeyRequest.getToKeyName());
     return auditMap;
   }
+
+  public static String getRequestType() {
+    return RenameKey.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
index 907b501..18b5f84 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
@@ -60,6 +60,7 @@ import static 
org.apache.hadoop.ozone.audit.OMAction.DELETE_KEYS;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.PARTIAL_DELETE;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.DeleteKeys;
 
 /**
  * Handles DeleteKey request.
@@ -254,4 +255,8 @@ public class OMKeysDeleteRequest extends OMKeyRequest {
     auditMap.put(UNDELETED_KEYS_LIST, String.join(",", unDeletedKeys));
   }
 
+  public static String getRequestType() {
+    return DeleteKeys.name();
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
index abaa4ae..a17455f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java
@@ -59,6 +59,7 @@ import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
 import static org.apache.hadoop.ozone.OzoneConsts.RENAMED_KEYS_MAP;
 import static org.apache.hadoop.ozone.OzoneConsts.UNRENAMED_KEYS_MAP;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RenameKeys;
 
 /**
  * Handles rename keys request.
@@ -248,6 +249,10 @@ public class OMKeysRenameRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
+  public static String getRequestType() {
+    return RenameKeys.name();
+  }
+
   /**
    * Build audit map for RenameKeys request.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
index eca5294..2ecc4e8 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMTrashRecoverRequest.java
@@ -42,6 +42,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverTrash;
 
 /**
  * Handles RecoverTrash request.
@@ -140,4 +141,7 @@ public class OMTrashRecoverRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
+  public static String getRequestType() {
+    return RecoverTrash.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
index 3697cb8..5527742 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAddAclRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.AddAcl;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -27,6 +29,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,5 +104,9 @@ public class OMKeyAddAclRequest extends OMKeyAclRequest {
     // No need to check not null here, this will be never called with null.
     return omKeyInfo.addAcl(ozoneAcls.get(0));
   }
+
+  public static String getRequestType() {
+    return AddAcl.name() + "-" + ObjectType.KEY;
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
index f0d13be..cb3d16a7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyRemoveAclRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RemoveAcl;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -27,6 +29,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,5 +105,9 @@ public class OMKeyRemoveAclRequest extends OMKeyAclRequest {
     // No need to check not null here, this will be never called with null.
     return omKeyInfo.removeAcl(ozoneAcls.get(0));
   }
+
+  public static String getRequestType() {
+    return RemoveAcl.name() + "-" + ObjectType.KEY;
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
index 6d904e6..b76862f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeySetAclRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetAcl;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -28,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.OMKeyAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,5 +101,9 @@ public class OMKeySetAclRequest extends OMKeyAclRequest {
     // No need to check not null here, this will be never called with null.
     return omKeyInfo.setAcls(ozoneAcls);
   }
+
+  public static String getRequestType() {
+    return SetAcl.name() + "-" + ObjectType.KEY;
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAddAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAddAclRequest.java
index 7160042..e4dcea6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAddAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixAddAclRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key.acl.prefix;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.AddAcl;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -28,6 +30,7 @@ import 
org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.prefix.OMPrefixAclResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.slf4j.Logger;
@@ -123,5 +126,9 @@ public class OMPrefixAddAclRequest extends 
OMPrefixAclRequest {
     return prefixManager.addAcl(ozoneObj, ozoneAcls.get(0), omPrefixInfo,
         trxnLogIndex);
   }
+
+  public static String getRequestType() {
+    return AddAcl.name() + "-" + ObjectType.PREFIX;
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixRemoveAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixRemoveAclRequest.java
index 482250d..7af93ae 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixRemoveAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixRemoveAclRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key.acl.prefix;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RemoveAcl;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -28,6 +30,7 @@ import 
org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.prefix.OMPrefixAclResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.slf4j.Logger;
@@ -119,5 +122,9 @@ public class OMPrefixRemoveAclRequest extends 
OMPrefixAclRequest {
       OmPrefixInfo omPrefixInfo, long trxnLogIndex) throws IOException {
     return prefixManager.removeAcl(ozoneObj, ozoneAcls.get(0), omPrefixInfo);
   }
+
+  public static String getRequestType() {
+    return RemoveAcl.name() + "-" + ObjectType.PREFIX;
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixSetAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixSetAclRequest.java
index 144e90b..a0afece 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixSetAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/prefix/OMPrefixSetAclRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.key.acl.prefix;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetAcl;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,6 +30,7 @@ import 
org.apache.hadoop.ozone.om.PrefixManagerImpl.OMPrefixAclOpResult;
 import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.key.acl.prefix.OMPrefixAclResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.slf4j.Logger;
@@ -116,5 +119,9 @@ public class OMPrefixSetAclRequest extends 
OMPrefixAclRequest {
     return prefixManager.setAcl(ozoneObj, ozoneAcls, omPrefixInfo,
         trxnLogIndex);
   }
+
+  public static String getRequestType() {
+    return SetAcl.name() + "-" + ObjectType.PREFIX;
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index 08063b6..7c5e0e3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -54,6 +54,7 @@ import java.util.UUID;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload;
 
 /**
  * Handles initiate multipart upload request.
@@ -250,4 +251,8 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return InitiateMultiPartUpload.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
index 8b53e70..a0263ee 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.AbortMultiPartUpload;
 
 /**
  * Handles Abort of multipart upload request.
@@ -211,4 +212,8 @@ public class S3MultipartUploadAbortRequest extends 
OMKeyRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return AbortMultiPartUpload.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
index f471de4..31b245b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CommitMultiPartUpload;
 
 /**
  * Handle Multipart upload commit upload part file.
@@ -279,5 +280,8 @@ public class S3MultipartUploadCommitPartRequest extends 
OMKeyRequest {
     return omClientResponse;
   }
 
+  public static String getRequestType() {
+    return CommitMultiPartUpload.name();
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
index dff022b..fd4af37 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java
@@ -57,6 +57,8 @@ import org.apache.commons.codec.digest.DigestUtils;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CompleteMultiPartUpload;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -377,5 +379,9 @@ public class S3MultipartUploadCompleteRequest extends 
OMKeyRequest {
         new CacheKey<>(multipartKey),
         new CacheValue<>(Optional.absent(), transactionLogIndex));
   }
+
+  public static String getRequestType() {
+    return CompleteMultiPartUpload.name();
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
index b240373..691b278 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.GetS3Secret;
 
 /**
  * Handles GetS3Secret request.
@@ -186,4 +187,8 @@ public class S3GetSecretRequest extends OMClientRequest {
     }
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return GetS3Secret.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMCancelDelegationTokenRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMCancelDelegationTokenRequest.java
index e931735..0b159bb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMCancelDelegationTokenRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMCancelDelegationTokenRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.security;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CancelDelegationToken;
+
 import com.google.common.base.Optional;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -108,7 +110,6 @@ public class OMCancelDelegationTokenRequest extends 
OMClientRequest {
     return omClientResponse;
   }
 
-
   public Token<OzoneTokenIdentifier> getToken() {
     CancelDelegationTokenRequestProto cancelDelegationTokenRequest =
         getOmRequest().getCancelDelegationTokenRequest();
@@ -116,4 +117,8 @@ public class OMCancelDelegationTokenRequest extends 
OMClientRequest {
     return OMPBHelper.convertToDelegationToken(
         cancelDelegationTokenRequest.getToken());
   }
+
+  public static String getRequestType() {
+    return CancelDelegationToken.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMGetDelegationTokenRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMGetDelegationTokenRequest.java
index 4d2a6b4..81eb3d2 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMGetDelegationTokenRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMGetDelegationTokenRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.security;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.GetDelegationToken;
+
 import com.google.common.base.Optional;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -64,7 +66,6 @@ public class OMGetDelegationTokenRequest extends 
OMClientRequest {
     Token<OzoneTokenIdentifier> token = ozoneManager
         .getDelegationToken(new Text(getDelegationTokenRequest.getRenewer()));
 
-
     // Client issues GetDelegationToken request, when received by OM leader
     // it will generate a token. Original GetDelegationToken request is
     // converted to UpdateGetDelegationToken request with the generated token
@@ -181,4 +182,8 @@ public class OMGetDelegationTokenRequest extends 
OMClientRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return GetDelegationToken.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMRenewDelegationTokenRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMRenewDelegationTokenRequest.java
index 360ca4f..a683216 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMRenewDelegationTokenRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/security/OMRenewDelegationTokenRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om.request.security;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RenewDelegationToken;
+
 import java.io.IOException;
 
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
@@ -156,4 +158,8 @@ public class OMRenewDelegationTokenRequest extends 
OMClientRequest {
 
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return RenewDelegationToken.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
index 3cb9210..9a8d56e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.ozone.om.request.upgrade;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.FinalizeUpgradeProgress;
+
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -84,4 +86,8 @@ public class OMFinalizeUpgradeProgressRequest extends 
OMClientRequest {
 
     return response;
   }
+
+  public static String getRequestType() {
+    return FinalizeUpgradeProgress.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
index 772eae7..1b1897a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
@@ -17,13 +17,14 @@
 
 package org.apache.hadoop.ozone.om.request.upgrade;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.FinalizeUpgrade;
+
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.upgrade.OMFinalizeUpgradeResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -53,7 +54,7 @@ public class OMFinalizeUpgradeRequest extends OMClientRequest 
{
     LOG.trace("Request: {}", getOmRequest());
     OMResponse.Builder responseBuilder =
         OmResponseUtil.getOMResponseBuilder(getOmRequest());
-    
responseBuilder.setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgrade);
+    responseBuilder.setCmdType(FinalizeUpgrade);
     OMClientResponse response = null;
 
     try {
@@ -77,4 +78,8 @@ public class OMFinalizeUpgradeRequest extends OMClientRequest 
{
 
     return response;
   }
+
+  public static String getRequestType() {
+    return FinalizeUpgrade.name();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
index 7e2ccd9..ce1b2cb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.util.Time;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateVolume;
 
 /**
  * Handles volume create request.
@@ -201,6 +202,10 @@ public class OMVolumeCreateRequest extends OMVolumeRequest 
{
     }
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return CreateVolume.name();
+  }
 }
 
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
index ce93e26..05480cc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java
@@ -51,6 +51,8 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.DeleteVolume;
+
 /**
  * Handles volume delete request.
  */
@@ -162,5 +164,9 @@ public class OMVolumeDeleteRequest extends OMVolumeRequest {
     }
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return DeleteVolume.name();
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
index 6873086..d95d271 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java
@@ -48,6 +48,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetVolumeProperty;
 
 /**
  * Handle set owner request for volume.
@@ -208,5 +209,9 @@ public class OMVolumeSetOwnerRequest extends 
OMVolumeRequest {
     }
     return omClientResponse;
   }
+
+  public static String getRequestType() {
+    return SetVolumeProperty.name() + "-Owner";
+  }
 }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
index fc54c88..3b1b634 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetVolumeProperty;
 
 /**
  * Handles set Quota request for volume.
@@ -217,6 +218,10 @@ public class OMVolumeSetQuotaRequest extends 
OMVolumeRequest {
     }
     return true;
   }
+
+  public static String getRequestType() {
+    return SetVolumeProperty.name() + "-Quota";
+  }
 }
 
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
index 12008e2..d65e0df 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAddAclRequest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.om.request.volume.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.AddAcl;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
@@ -28,6 +30,7 @@ import 
org.apache.hadoop.ozone.om.response.volume.OMVolumeAclOpResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,4 +116,8 @@ public class OMVolumeAddAclRequest extends 
OMVolumeAclRequest {
           getOmRequest());
     }
   }
+
+  public static String getRequestType() {
+    return AddAcl.name() + "-" + ObjectType.VOLUME;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
index 461ad48..e386e20 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeRemoveAclRequest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.om.request.volume.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RemoveAcl;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
@@ -28,6 +30,7 @@ import 
org.apache.hadoop.ozone.om.response.volume.OMVolumeAclOpResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,4 +115,8 @@ public class OMVolumeRemoveAclRequest extends 
OMVolumeAclRequest {
           getOmRequest());
     }
   }
+
+  public static String getRequestType() {
+    return RemoveAcl.name() + "-" + ObjectType.VOLUME;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
index c73e19e..6a0d0f1 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeSetAclRequest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.om.request.volume.acl;
 
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.SetAcl;
+
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -27,6 +29,7 @@ import 
org.apache.hadoop.ozone.om.response.volume.OMVolumeAclOpResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,4 +112,8 @@ public class OMVolumeSetAclRequest extends 
OMVolumeAclRequest {
           getOmRequest());
     }
   }
+
+  public static String getRequestType() {
+    return SetAcl.name() + "-" + ObjectType.VOLUME;
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/BelongsToLayoutVersion.java
similarity index 78%
copy from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
copy to 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/BelongsToLayoutVersion.java
index 2da8b38..cfee3e6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/BelongsToLayoutVersion.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,13 +23,11 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature;
-
 /**
- * Annotation to specify if an API is backed up by a Layout Feature.
+ * Annotation to mark a class that belongs to a specific Layout Version.
  */
-@Target(ElementType.METHOD)
+@Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
-public @interface OMLayoutFeatureAPI {
+public @interface BelongsToLayoutVersion {
   OMLayoutFeature value();
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/DisallowedUntilLayoutVersion.java
similarity index 81%
copy from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
copy to 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/DisallowedUntilLayoutVersion.java
index 2da8b38..c437c1d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/DisallowedUntilLayoutVersion.java
@@ -23,13 +23,13 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature;
-
 /**
- * Annotation to specify if an API is backed up by a Layout Feature.
+ * Annotation used to "disallow" an API if current layout version does
+ * not include the associated layout feature. Helps to keep the method logic
+ * and upgrade related cross cutting concern separate.
  */
 @Target(ElementType.METHOD)
 @Retention(RetentionPolicy.RUNTIME)
-public @interface OMLayoutFeatureAPI {
+public @interface DisallowedUntilLayoutVersion {
   OMLayoutFeature value();
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java
new file mode 100644
index 0000000..0d22b01
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import java.util.Optional;
+
+import org.apache.hadoop.ozone.upgrade.LayoutFeature;
+
+/**
+ * List of OM Layout features / versions.
+ */
+public enum OMLayoutFeature implements LayoutFeature {
+  INITIAL_VERSION(0, "Initial Layout Version"),
+  CREATE_EC(1, ""),
+  NEW_FEATURE(2, "new feature", new NewOmFeatureUpgradeAction());
+
+  private int layoutVersion;
+  private String description;
+  private Optional<OmUpgradeAction> omUpgradeAction = Optional.empty();
+
+  OMLayoutFeature(final int layoutVersion, String description) {
+    this.layoutVersion = layoutVersion;
+    this.description = description;
+  }
+
+  OMLayoutFeature(final int layoutVersion, String description,
+                  OmUpgradeAction upgradeAction) {
+    this.layoutVersion = layoutVersion;
+    this.description = description;
+    omUpgradeAction = Optional.of(upgradeAction);
+  }
+
+  @Override
+  public int layoutVersion() {
+    return layoutVersion;
+  }
+
+  @Override
+  public String description() {
+    return description;
+  }
+
+  @Override
+  public Optional<OmUpgradeAction> onFinalizeAction() {
+    return omUpgradeAction;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAspect.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAspect.java
index a92e3b4..dbc0259 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAspect.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAspect.java
@@ -28,6 +28,7 @@ import org.aspectj.lang.annotation.Aspect;
 import org.aspectj.lang.annotation.Before;
 import org.aspectj.lang.reflect.MethodSignature;
 
+
 /**
  * 'Aspect' for OM Layout Feature API. All methods annotated with the
  * specific annotation will have pre-processing done here to check layout
@@ -36,19 +37,22 @@ import org.aspectj.lang.reflect.MethodSignature;
 @Aspect
 public class OMLayoutFeatureAspect {
 
-  @Before("@annotation(OMLayoutFeatureAPI) && execution(* *(..))")
+  @Before("@annotation(DisallowedUntilLayoutVersion) && execution(* *(..))")
   public void checkLayoutFeature(JoinPoint joinPoint) throws Throwable {
     String featureName = ((MethodSignature) joinPoint.getSignature())
-        .getMethod().getAnnotation(OMLayoutFeatureAPI.class).value().name();
-    LayoutVersionManager lvm = OMLayoutVersionManager.getInstance();
+        .getMethod().getAnnotation(DisallowedUntilLayoutVersion.class)
+        .value().name();
+    LayoutVersionManager lvm = OMLayoutVersionManagerImpl.getInstance();
     if (!lvm.isAllowed(featureName)) {
       LayoutFeature layoutFeature = lvm.getFeature(featureName);
       throw new OMException(String.format("Operation %s cannot be invoked " +
-          "before finalization. Current layout version = %d, feature's layout" 
+
-              " version = %d",
-          featureName,
-          lvm.getMetadataLayoutVersion(),
-          layoutFeature.layoutVersion()), NOT_SUPPORTED_OPERATION);
+              "before finalization. It belongs to the layout feature %s, " +
+              "whose layout version is %d. Current Layout version is %d",
+          joinPoint.getSignature().toShortString(),
+          layoutFeature.name(),
+          layoutFeature.layoutVersion(),
+          lvm.getMetadataLayoutVersion()),
+          NOT_SUPPORTED_OPERATION);
     }
   }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureCatalog.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureCatalog.java
deleted file mode 100644
index c5ed27d..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureCatalog.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.upgrade;
-
-import java.util.Optional;
-
-import org.apache.hadoop.ozone.upgrade.LayoutFeature;
-
-/**
- * Catalog of Ozone Manager features.
- */
-public class OMLayoutFeatureCatalog {
-
-  /**
-   * List of OM Features.
-   */
-  public enum OMLayoutFeature implements LayoutFeature {
-    INITIAL_VERSION(0, "Initial Layout Version"),
-    CREATE_EC(1, ""),
-    NEW_FEATURE(2, "new feature", new NewOmFeatureUpgradeAction());
-
-
-    private int layoutVersion;
-    private String description;
-    private Optional<OmUpgradeAction> omUpgradeAction = Optional.empty();
-
-    OMLayoutFeature(final int layoutVersion, String description) {
-      this.layoutVersion = layoutVersion;
-      this.description = description;
-    }
-
-    OMLayoutFeature(final int layoutVersion, String description,
-                    OmUpgradeAction upgradeAction) {
-      this.layoutVersion = layoutVersion;
-      this.description = description;
-      omUpgradeAction = Optional.of(upgradeAction);
-    }
-
-    @Override
-    public int layoutVersion() {
-      return layoutVersion;
-    }
-
-    @Override
-    public String description() {
-      return description;
-    }
-
-    @Override
-    public Optional<OmUpgradeAction> onFinalizeAction() {
-      return omUpgradeAction;
-    }
-  }
-
-  /**
-   * This is an example of an "API" that uses a new Layout feature (EC) that is
-   * not yet supported by the current layout version. The following can be
-   * "guarded" by just adding the following annotation, thereby keeping the
-   * method logic and upgrade logic separate.
-   */
-  @OMLayoutFeatureAPI(OMLayoutFeature.CREATE_EC)
-  public String ecMethod() {
-    // Blah Blah EC Blah....
-    return "ec";
-  }
-
-  /**
-   * This is an example of an "API" that uses a Layout feature (EC) that is
-   * supported by the current layout version.
-   */
-  @OMLayoutFeatureAPI(OMLayoutFeature.INITIAL_VERSION)
-  public String basicMethod() {
-    // Blah Blah Basic Blah....
-    return "basic";
-  }
-}
-
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureUtil.java
similarity index 53%
copy from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
copy to 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureUtil.java
index 2da8b38..c237638 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureUtil.java
@@ -18,18 +18,32 @@
 
 package org.apache.hadoop.ozone.om.upgrade;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature;
+import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.CREATE_EC;
 
 /**
- * Annotation to specify if an API is backed up by a Layout Feature.
+ * Test util class. To be removed.
  */
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface OMLayoutFeatureAPI {
-  OMLayoutFeature value();
+public class OMLayoutFeatureUtil {
+
+  /**
+   * This is an example of an "API" that uses a new Layout feature (EC) that is
+   * not yet supported by the current layout version. The following can be
+   * "disallowed" by just adding the following annotation, thereby keeping the
+   * method logic and upgrade logic separate.
+   */
+  @DisallowedUntilLayoutVersion(CREATE_EC)
+  public String ecMethod() {
+    // Blah Blah EC Blah....
+    return "ec";
+  }
+
+  /**
+   * This is an example of an "API" that is
+   * supported by the current layout version.
+   */
+  public String basicMethod() {
+    // Blah Blah Basic Blah....
+    return "basic";
+  }
 }
+
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManager.java
deleted file mode 100644
index 2f959a9..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManager.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.upgrade;
-
-import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
-
-import org.apache.hadoop.ozone.common.Storage;
-import org.apache.hadoop.ozone.om.OMStorage;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature;
-import org.apache.hadoop.ozone.upgrade.AbstractLayoutVersionManager;
-import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Class to manage layout versions and features for Ozone Manager.
- */
-public final class OMLayoutVersionManager extends AbstractLayoutVersionManager 
{
-
-  private static OMLayoutVersionManager omVersionManager;
-
-  private OMLayoutVersionManager() {
-  }
-
-  /**
-   * Read only instance to OM Version Manager.
-   * @return version manager instance.
-   */
-  public static synchronized LayoutVersionManager getInstance() {
-    if (omVersionManager == null) {
-      throw new RuntimeException("OM Layout Version Manager not yet " +
-          "initialized.");
-    }
-    return omVersionManager;
-  }
-
-
-  /**
-   * Initialize OM version manager from storage.
-   * @return version manager instance.
-   */
-  public static synchronized OMLayoutVersionManager initialize(
-      OMStorage omStorage)
-      throws OMException {
-    if (omVersionManager == null) {
-      omVersionManager = new OMLayoutVersionManager();
-      omVersionManager.init(omStorage);
-    }
-    return omVersionManager;
-  }
-
-  /**
-   * Initialize the OM Layout Features and current Layout Version.
-   * @param storage to read the current layout version.
-   * @throws OMException on error.
-   */
-  private void init(Storage storage) throws OMException {
-    init(storage.getLayoutVersion(), OMLayoutFeature.values());
-    if (metadataLayoutVersion > softwareLayoutVersion) {
-      throw new OMException(
-          String.format("Cannot initialize VersionManager. Metadata " +
-                  "layout version (%d) > software layout version (%d)",
-              metadataLayoutVersion, softwareLayoutVersion),
-          NOT_SUPPORTED_OPERATION);
-    }
-  }
-
-  @VisibleForTesting
-  protected synchronized static void resetLayoutVersionManager() {
-    if (omVersionManager != null) {
-      omVersionManager.reset();
-      omVersionManager = null;
-    }
-  }
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
new file mode 100644
index 0000000..70a8d6b
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
+import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.INITIAL_VERSION;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Set;
+
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.upgrade.AbstractLayoutVersionManager;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionInstanceFactory;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
+import org.apache.hadoop.ozone.upgrade.VersionFactoryKey;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class to manage layout versions and features for Ozone Manager.
+ */
+public final class OMLayoutVersionManagerImpl
+    extends AbstractLayoutVersionManager implements OmLayoutVersionManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMLayoutVersionManagerImpl.class);
+
+  private static OMLayoutVersionManagerImpl omVersionManager;
+  private LayoutVersionInstanceFactory<Class<? extends OMClientRequest>>
+      requestFactory;
+
+  private OMLayoutVersionManagerImpl() {
+    requestFactory = new LayoutVersionInstanceFactory<>();
+  }
+
+  /**
+   * Read only instance to OM Version Manager.
+   * @return version manager instance.
+   */
+  public static synchronized LayoutVersionManager getInstance() {
+    if (omVersionManager == null) {
+      throw new RuntimeException("OM Layout Version Manager not yet " +
+          "initialized.");
+    }
+    return omVersionManager;
+  }
+
+
+  /**
+   * Initialize OM version manager from storage.
+   * @return version manager instance.
+   */
+  public static synchronized OMLayoutVersionManagerImpl initialize(
+      OMStorage omStorage)
+      throws OMException {
+    if (omVersionManager == null) {
+      omVersionManager = new OMLayoutVersionManagerImpl();
+      omVersionManager.init(omStorage);
+    }
+    return omVersionManager;
+  }
+
+  /**
+   * Initialize the OM Layout Features and current Layout Version.
+   * @param storage to read the current layout version.
+   * @throws OMException on error.
+   */
+  private void init(Storage storage) throws OMException {
+    init(storage.getLayoutVersion(), OMLayoutFeature.values());
+
+    if (metadataLayoutVersion > softwareLayoutVersion) {
+      throw new OMException(
+          String.format("Cannot initialize VersionManager. Metadata " +
+                  "layout version (%d) > software layout version (%d)",
+              metadataLayoutVersion, softwareLayoutVersion),
+          NOT_SUPPORTED_OPERATION);
+    }
+    registerOzoneManagerRequests();
+  }
+
+  public void doFinalize(OzoneManager om) {
+    super.doFinalize(om);
+    requestFactory.onFinalize(this);
+  }
+
+  @VisibleForTesting
+  protected synchronized static void resetLayoutVersionManager() {
+    if (omVersionManager != null) {
+      omVersionManager.reset();
+      omVersionManager = null;
+    }
+  }
+
+  public void reset() {
+    requestFactory = null;
+    super.reset();
+  }
+
+  private void registerOzoneManagerRequests() {
+    Reflections reflections = new Reflections(
+        "org.apache.hadoop.ozone.om.request");
+    Set<Class<? extends OMClientRequest>> subTypes =
+        reflections.getSubTypesOf(OMClientRequest.class);
+    try {
+      for (Class<? extends OMClientRequest> requestClass : subTypes) {
+        if (Modifier.isAbstract(requestClass.getModifiers())) {
+          continue;
+        }
+        try {
+          Method getRequestTypeMethod = requestClass.getMethod(
+              "getRequestType");
+          String type = (String) getRequestTypeMethod.invoke(null);
+          LOG.debug("Registering {} with OmVersionFactory.",
+              requestClass.getSimpleName());
+          BelongsToLayoutVersion annotation =
+              requestClass.getAnnotation(BelongsToLayoutVersion.class);
+          if (annotation == null) {
+            registerRequestType(type, INITIAL_VERSION.layoutVersion(),
+                requestClass);
+          } else {
+            registerRequestType(type, annotation.value().layoutVersion(),
+                requestClass);
+          }
+        } catch (NoSuchMethodException nsmEx) {
+          LOG.warn("Found a class {} with request type not defined. ",
+              requestClass.getSimpleName());
+        }
+      }
+    } catch (Exception ex) {
+      LOG.error("Exception registering OM client request.", ex);
+    }
+  }
+
+  private void registerRequestType(String type, int version,
+                                   Class<? extends OMClientRequest> reqClass) {
+    VersionFactoryKey key = new VersionFactoryKey.Builder()
+        .key(type).version(version).build();
+    requestFactory.register(this, key, reqClass);
+  }
+
+  /**
+   * Given a type and version, get the corresponding request class type.
+   * @param requestType type string
+   * @param version version
+   * @return class type.
+   */
+  @Override
+  public Class< ? extends OMClientRequest> getRequestHandler(String type) {
+    VersionFactoryKey versionFactoryKey = new VersionFactoryKey.Builder()
+        .key(type).build();
+    return requestFactory.get(this, versionFactoryKey);
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmLayoutVersionManager.java
similarity index 66%
rename from 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
rename to 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmLayoutVersionManager.java
index 2da8b38..51b5c6c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeatureAPI.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmLayoutVersionManager.java
@@ -18,18 +18,12 @@
 
 package org.apache.hadoop.ozone.om.upgrade;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 
 /**
- * Annotation to specify if an API is backed up by a Layout Feature.
+ * Read only Interface for OM Layout Version Management.
  */
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface OMLayoutFeatureAPI {
-  OMLayoutFeature value();
+public interface OmLayoutVersionManager extends LayoutVersionManager {
+  Class<? extends OMClientRequest> getRequestHandler(String requestType);
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 73277e0..0207b6b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.ozone.protocolPB;
 
+import static 
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.getRequest;
+
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -98,7 +100,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
     this.omRatisServer = ratisServer;
     dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
         metrics, LOG);
-
   }
 
   /**
@@ -124,8 +125,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
       } else {
         if (omRatisServer.isLeader()) {
           try {
-            OMClientRequest omClientRequest =
-                OzoneManagerRatisUtils.createClientRequest(request);
+            OMClientRequest omClientRequest = getRequest(ozoneManager, 
request);
             request = omClientRequest.preExecute(ozoneManager);
           } catch (IOException ex) {
             // As some of the preExecute returns error. So handle here.
@@ -217,8 +217,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
       if (OmUtils.isReadOnly(request)) {
         return handler.handleReadRequest(request);
       } else {
-        OMClientRequest omClientRequest =
-            OzoneManagerRatisUtils.createClientRequest(request);
+        OMClientRequest omClientRequest = getRequest(ozoneManager, request);
         request = omClientRequest.preExecute(ozoneManager);
         index = transactionIndex.incrementAndGet();
         omClientResponse = handler.handleWriteRequest(request, index);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 35ab275..dd9e704 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -78,6 +78,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 
 import com.google.common.collect.Lists;
+
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
@@ -91,6 +92,7 @@ import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadInfo;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartInfo;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -222,7 +224,7 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
   public OMClientResponse handleWriteRequest(OMRequest omRequest,
       long transactionLogIndex) {
     OMClientRequest omClientRequest =
-        OzoneManagerRatisUtils.createClientRequest(omRequest);
+        OzoneManagerRatisUtils.getRequest(getOzoneManager(), omRequest);
     OMClientResponse omClientResponse =
         omClientRequest.validateAndUpdateCache(getOzoneManager(),
             transactionLogIndex, ozoneManagerDoubleBuffer::add);
@@ -290,6 +292,14 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
       throw new OMException("ClientId is null",
           OMException.ResultCodes.INVALID_REQUEST);
     }
+
+    // Layout version should have been set up the leader while serializing
+    // the request, and hence cannot be null. This version is used by each
+    // node to identify which request handler version to use.
+    if (omRequest.getLayoutVersion() == null) {
+      throw new OMException("LayoutVersion for request is null.",
+          OMException.ResultCodes.INTERNAL_ERROR);
+    }
   }
 
   private CheckVolumeAccessResponse checkVolumeAccess(
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/TestBucketRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/TestBucketRequest.java
index 7ae82f8..012f2c9 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/TestBucketRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/TestBucketRequest.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.ozone.om.request.bucket;
 
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManagerImpl;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -37,6 +38,7 @@ import 
org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
@@ -71,6 +73,9 @@ public class TestBucketRequest {
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     when(ozoneManager.isRatisEnabled()).thenReturn(true);
+    OMLayoutVersionManagerImpl lvm = mock(OMLayoutVersionManagerImpl.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
     auditLogger = Mockito.mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 116ba5c..4c1425e 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.KeyManagerImpl;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManagerImpl;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.junit.After;
 import org.junit.Before;
@@ -58,6 +59,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
@@ -108,6 +110,9 @@ public class TestOMKeyRequest {
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     when(ozoneManager.getConfiguration()).thenReturn(ozoneConfiguration);
+    OMLayoutVersionManagerImpl lvm = mock(OMLayoutVersionManagerImpl.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
     when(ozoneManager.isRatisEnabled()).thenReturn(true);
     auditLogger = Mockito.mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeRequest.java
index d0b2cf0..c2486db 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/volume/TestOMVolumeRequest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManagerImpl;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .CreateVolumeRequest;
@@ -44,6 +45,7 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
@@ -67,7 +69,7 @@ public class TestOMVolumeRequest {
 
   @Before
   public void setup() throws Exception {
-    ozoneManager = Mockito.mock(OzoneManager.class);
+    ozoneManager = mock(OzoneManager.class);
     omMetrics = OMMetrics.create();
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
     ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
@@ -76,8 +78,11 @@ public class TestOMVolumeRequest {
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     when(ozoneManager.getMaxUserVolumeCount()).thenReturn(10L);
+    OMLayoutVersionManagerImpl lvm = mock(OMLayoutVersionManagerImpl.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
     when(ozoneManager.isRatisEnabled()).thenReturn(true);
-    auditLogger = Mockito.mock(AuditLogger.class);
+    auditLogger = mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMLayoutFeatureAspect.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMLayoutFeatureAspect.java
index b68c7c2..d37d0e7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMLayoutFeatureAspect.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMLayoutFeatureAspect.java
@@ -56,8 +56,8 @@ public class TestOMLayoutFeatureAspect {
    */
   @Test
   public void testCheckLayoutFeature() throws Exception {
-    OMLayoutVersionManager.initialize(new OMStorage(configuration));
-    OMLayoutFeatureCatalog testObj = new OMLayoutFeatureCatalog();
+    OMLayoutVersionManagerImpl.initialize(new OMStorage(configuration));
+    OMLayoutFeatureUtil testObj = new OMLayoutFeatureUtil();
     try {
       testObj.ecMethod();
       Assert.fail();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
index cfcfe24..9116e59 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.ozone.om.upgrade;
 
-import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature.CREATE_EC;
-import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature.INITIAL_VERSION;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
+import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.CREATE_EC;
+import static 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.INITIAL_VERSION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -30,8 +31,9 @@ import java.io.IOException;
 
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import 
org.apache.hadoop.ozone.om.upgrade.OMLayoutFeatureCatalog.OMLayoutFeature;
-import org.apache.hadoop.ozone.upgrade.LayoutFeature;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -39,12 +41,17 @@ import org.junit.Test;
  */
 public class TestOMVersionManager {
 
+  @After
+  public void cleanup() {
+    OMLayoutVersionManagerImpl.resetLayoutVersionManager();
+  }
+
   @Test
   public void testOMLayoutVersionManager() throws IOException {
     OMStorage omStorage = mock(OMStorage.class);
     when(omStorage.getLayoutVersion()).thenReturn(0);
-    OMLayoutVersionManager omVersionManager =
-        OMLayoutVersionManager.initialize(omStorage);
+    OMLayoutVersionManagerImpl omVersionManager =
+        OMLayoutVersionManagerImpl.initialize(omStorage);
     assertTrue(omVersionManager.isAllowed(INITIAL_VERSION));
     assertFalse(omVersionManager.isAllowed(CREATE_EC));
     assertEquals(0, omVersionManager.getMetadataLayoutVersion());
@@ -55,10 +62,38 @@ public class TestOMVersionManager {
   }
 
   @Test
+  public void testOMLayoutVersionManagerInitError() {
+    OMStorage omStorage = mock(OMStorage.class);
+    when(omStorage.getLayoutVersion()).thenReturn(
+        OMLayoutFeature.values()[OMLayoutFeature.values().length - 1]
+            .layoutVersion() + 1);
+    try {
+      OMLayoutVersionManagerImpl.initialize(omStorage);
+      Assert.fail();
+    } catch (OMException ex) {
+      assertEquals(NOT_SUPPORTED_OPERATION, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testOMLayoutVersionManagerReset() throws IOException {
+    OMStorage omStorage = mock(OMStorage.class);
+    when(omStorage.getLayoutVersion()).thenReturn(0);
+    OMLayoutVersionManagerImpl omVersionManager =
+        OMLayoutVersionManagerImpl.initialize(omStorage);
+    int numLayoutVersions = OMLayoutFeature.values().length;
+    assertEquals(
+        OMLayoutFeature.values()[numLayoutVersions - 1].layoutVersion(),
+        omVersionManager.getSoftwareLayoutVersion());
+    OMLayoutVersionManagerImpl.resetLayoutVersionManager();
+    assertEquals(0, omVersionManager.getSoftwareLayoutVersion());
+  }
+
+  @Test
   public void testOMLayoutFeatureCatalog() {
     OMLayoutFeature[] values = OMLayoutFeature.values();
     int currVersion = Integer.MIN_VALUE;
-    for (LayoutFeature lf : values) {
+    for (OMLayoutFeature lf : values) {
       assertTrue(currVersion <= lf.layoutVersion());
       currVersion = lf.layoutVersion();
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
new file mode 100644
index 0000000..dffddd1
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.CreateKey;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.key.OMECKeyCreateRequest;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.reflections.Reflections;
+
+/**
+ * Test OmVersionFactory.
+ */
+public class TestOmVersionManagerRequestFactory {
+
+  private static OMLayoutVersionManagerImpl omVersionManager;
+
+  @BeforeClass
+  public static void setup() throws OMException {
+    OMStorage omStorage = mock(OMStorage.class);
+    when(omStorage.getLayoutVersion()).thenReturn(0);
+    omVersionManager = OMLayoutVersionManagerImpl.initialize(omStorage);
+  }
+
+  @Test
+  public void testKeyCreateRequest() throws Exception {
+
+    // Try getting v1 of 'CreateKey'.
+    Class<? extends OMClientRequest> requestType =
+        omVersionManager.getRequestHandler(CreateKey.name());
+    Assert.assertEquals(requestType, OMKeyCreateRequest.class);
+
+    // Finalize the version manager.
+    omVersionManager.doFinalize(mock(OzoneManager.class));
+
+    // Try getting 'CreateKey' again. Should return CreateECKey.
+    requestType = omVersionManager.getRequestHandler(CreateKey.name());
+    Assert.assertEquals(requestType, OMECKeyCreateRequest.class);
+  }
+
+  @Test
+  public void testAllOMRequestClassesHaveGetRequestTypeMethod()
+      throws Exception {
+    Reflections reflections = new Reflections(
+        "org.apache.hadoop.ozone.om.request");
+    Set<Class<? extends OMClientRequest>> subTypes =
+        reflections.getSubTypesOf(OMClientRequest.class);
+    List<Class<? extends OMClientRequest>> collect = subTypes.stream()
+            .filter(c -> !Modifier.isAbstract(c.getModifiers()))
+            .collect(Collectors.toList());
+
+    for (Class<? extends OMClientRequest> c : collect) {
+      Method getRequestTypeMethod = null;
+      try {
+        getRequestTypeMethod = c.getMethod("getRequestType");
+      } catch (NoSuchMethodException nsmEx) {
+        Assert.fail(String.format(
+            "%s does not have the 'getRequestType' method " +
+            "which should be defined or inherited for every OM request class.",
+            c));
+      }
+      String type = (String) getRequestTypeMethod.invoke(null);
+      Assert.assertNotNull(String.format("Cannot get handler for %s", type),
+          omVersionManager.getRequestHandler(type));
+    }
+  }
+
+  @Test
+  public void testOmClientRequestHasExpectedConstructor()
+      throws NoSuchMethodException {
+    Reflections reflections = new Reflections(
+        "org.apache.hadoop.ozone.om.request");
+    Set<Class<? extends OMClientRequest>> subTypes =
+        reflections.getSubTypesOf(OMClientRequest.class);
+
+    for (Class<? extends OMClientRequest> requestClass : subTypes) {
+      if (Modifier.isAbstract(requestClass.getModifiers())) {
+        continue;
+      }
+      Method getRequestTypeMethod = requestClass.getMethod(
+          "getRequestType");
+      Assert.assertNotNull(getRequestTypeMethod);
+
+      Constructor<? extends OMClientRequest> constructorWithOmRequestArg =
+          requestClass.getDeclaredConstructor(OMRequest.class);
+      Assert.assertNotNull(constructorWithOmRequestArg);
+    }
+  }
+}
diff --git a/hadoop-ozone/s3gateway/pom.xml b/hadoop-ozone/s3gateway/pom.xml
index 4a62fc7..588ecc8 100644
--- a/hadoop-ozone/s3gateway/pom.xml
+++ b/hadoop-ozone/s3gateway/pom.xml
@@ -37,6 +37,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.javassist</groupId>
+      <artifactId>javassist</artifactId>
+      <version>3.26.0-GA</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdds-test-utils</artifactId>
       <scope>test</scope>


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to