This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
     new bfce8b9  HDDS-4172. Implement Finalize command in Ozone Manager 
server. (#1456)
bfce8b9 is described below

commit bfce8b93ca6d79d7997dc66ae1816dc4f1bba472
Author: Istvan Fajth <pi...@cloudera.com>
AuthorDate: Tue Oct 13 05:49:17 2020 +0200

    HDDS-4172. Implement Finalize command in Ozone Manager server. (#1456)
---
 .../hdds/upgrade/HDDSLayoutVersionManager.java     |   2 +-
 .../org/apache/hadoop/ozone/common/Storage.java    |  12 +
 .../apache/hadoop/ozone/common/StorageInfo.java    |  40 ++-
 .../upgrade/AbstractLayoutVersionManager.java      |  87 +++---
 .../apache/hadoop/ozone/upgrade/LayoutFeature.java |   2 +-
 .../upgrade/LayoutVersionInstanceFactory.java      |  15 +-
 .../hadoop/ozone/upgrade/LayoutVersionManager.java |   2 +
 .../hadoop/ozone/upgrade/UpgradeFinalizer.java     | 160 ++++++++++
 .../upgrade/TestAbstractLayoutVersionManager.java  | 145 +++++++--
 .../upgrade/TestLayoutVersionInstanceFactory.java  |   9 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   2 +-
 .../hadoop/ozone/om/exceptions/OMException.java    |   6 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  10 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  26 +-
 .../src/main/proto/OmClientProtocol.proto          |   4 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  69 +----
 .../upgrade/OMFinalizeUpgradeProgressRequest.java  |  93 ------
 .../request/upgrade/OMFinalizeUpgradeRequest.java  |  15 +-
 .../upgrade/OMFinalizeUpgradeProgressResponse.java |  45 ---
 .../om/upgrade/OMLayoutVersionManagerImpl.java     |  27 +-
 .../ozone/om/upgrade/OMUpgradeFinalizer.java       | 328 +++++++++++++++++++++
 .../protocolPB/OzoneManagerRequestHandler.java     |  34 ++-
 .../ozone/om/upgrade/TestOMUpgradeFinalizer.java   | 289 ++++++++++++++++++
 .../ozone/om/upgrade/TestOMVersionManager.java     |   6 +-
 .../TestOmVersionManagerRequestFactory.java        |   6 +-
 .../ozone/admin/om/FinalizeUpgradeSubCommand.java  |  40 ++-
 26 files changed, 1149 insertions(+), 325 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutVersionManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutVersionManager.java
index 8c3ff3d..8b8c250 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutVersionManager.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/upgrade/HDDSLayoutVersionManager.java
@@ -34,7 +34,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @SuppressWarnings("FinalClass")
 public class HDDSLayoutVersionManager extends
-    AbstractLayoutVersionManager {
+    AbstractLayoutVersionManager<HDDSLayoutFeature> {
 
   private static HDDSLayoutVersionManager hddsLayoutVersionManager;
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
index d5e1348..9029c63 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
@@ -121,6 +121,18 @@ public abstract class Storage {
     return storageInfo.getLayoutVersion();
   }
 
+  public void setLayoutVersion(int version) {
+    storageInfo.setLayoutVersion(version);
+  }
+
+  public void setUpgradeToLayoutVersion(int version) {
+    storageInfo.setUpgradingToLayoutVersion(version);
+  }
+
+  public void unsetUpgradeToLayoutVersion() {
+    storageInfo.unsetUpgradingToLayoutVersion();
+  }
+
   /**
    * Retrieves the storageInfo instance to read/write the common
    * version file properties.
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
index 55911fc..18871a5 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
@@ -57,6 +57,11 @@ public class StorageInfo {
    */
   private static final String LAYOUT_VERSION = "layoutVersion";
 
+  private static final String UPGRADING_TO_LAYOUT_VERSION =
+      "upgradingToLayoutVersion";
+
+  private static final int INVALID_LAYOUT_VERSION = -1;
+
   /**
    * Constructs StorageInfo instance.
    * @param type
@@ -85,6 +90,7 @@ public class StorageInfo {
     verifyClusterId();
     verifyCreationTime();
     verifyLayoutVersion();
+    verifyUpgradingToLayoutVersion();
   }
 
   public NodeType getNodeType() {
@@ -111,6 +117,14 @@ public class StorageInfo {
     return 0;
   }
 
+  public int getUpgradingToLayoutVersion() {
+    String upgradingTo = properties.getProperty(UPGRADING_TO_LAYOUT_VERSION);
+    if (upgradingTo != null) {
+      return Integer.parseInt(upgradingTo);
+    }
+    return INVALID_LAYOUT_VERSION;
+  }
+
   private void verifyLayoutVersion() {
     String layout = getProperty(LAYOUT_VERSION);
     if (layout == null) {
@@ -131,6 +145,21 @@ public class StorageInfo {
     properties.setProperty(CLUSTER_ID, clusterId);
   }
 
+  public void setLayoutVersion(int version) {
+    properties.setProperty(LAYOUT_VERSION, Integer.toString(version));
+  }
+
+  public void setUpgradingToLayoutVersion(int layoutVersion) {
+    //TODO: do we need to check consecutiveness of versions here?
+    // if so, then change  setLayoutVersion to incLayoutVersion in APIs!
+    properties.setProperty(
+        UPGRADING_TO_LAYOUT_VERSION, Integer.toString(layoutVersion));
+  }
+
+  public void unsetUpgradingToLayoutVersion() {
+    properties.remove(UPGRADING_TO_LAYOUT_VERSION);
+  }
+
   private void verifyNodeType(NodeType type)
       throws InconsistentStorageStateException {
     NodeType nodeType = getNodeType();
@@ -150,6 +179,16 @@ public class StorageInfo {
     }
   }
 
+  private void verifyUpgradingToLayoutVersion()
+      throws InconsistentStorageStateException {
+    int upgradeMark = getUpgradingToLayoutVersion();
+    if (upgradeMark != INVALID_LAYOUT_VERSION) {
+      throw new InconsistentStorageStateException("Ozone Manager died during "
+          + "a LayoutFeature upgrade.");
+      //TODO add recovery steps here, or point to a recovery doc.
+    }
+  }
+
   private void verifyCreationTime() {
     Long creationTime = getCreationTime();
     Preconditions.checkNotNull(creationTime);
@@ -200,5 +239,4 @@ public class StorageInfo {
   public static String newClusterID() {
     return "CID-" + UUID.randomUUID().toString();
   }
-
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 158900b..6da1ec3 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -21,10 +21,9 @@ package org.apache.hadoop.ozone.upgrade;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
-import java.util.Optional;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 
@@ -32,22 +31,23 @@ import com.google.common.base.Preconditions;
  * Layout Version Manager containing generic method implementations.
  */
 @SuppressWarnings("visibilitymodifier")
-public abstract class AbstractLayoutVersionManager implements
-    LayoutVersionManager {
+public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
+    implements LayoutVersionManager {
 
   protected int metadataLayoutVersion; // MLV.
   protected int softwareLayoutVersion; // SLV.
-  protected TreeMap<Integer, LayoutFeature> features = new TreeMap<>();
-  protected Map<String, LayoutFeature> featureMap = new HashMap<>();
+  protected TreeMap<Integer, T> features = new TreeMap<>();
+  protected Map<String, T> featureMap = new HashMap<>();
   protected volatile boolean isInitialized = false;
 
-  protected void init(int version, LayoutFeature[] lfs) throws IOException {
+  protected void init(int version, T[] lfs) throws IOException {
+
     if (!isInitialized) {
       metadataLayoutVersion = version;
       initializeFeatures(lfs);
       softwareLayoutVersion = features.lastKey();
       isInitialized = true;
-      if (metadataLayoutVersion > softwareLayoutVersion) {
+      if (softwareIsBehindMetaData()) {
         throw new IOException(
             String.format("Cannot initialize VersionManager. Metadata " +
                     "layout version (%d) > software layout version (%d)",
@@ -56,7 +56,7 @@ public abstract class AbstractLayoutVersionManager implements
     }
   }
 
-  protected void initializeFeatures(LayoutFeature[] lfs) {
+  private void initializeFeatures(T[] lfs) {
     Arrays.stream(lfs).forEach(f -> {
       Preconditions.checkArgument(!featureMap.containsKey(f.name()));
       Preconditions.checkArgument(!features.containsKey(f.layoutVersion()));
@@ -65,53 +65,74 @@ public abstract class AbstractLayoutVersionManager 
implements
     });
   }
 
+  protected void reset() {
+    metadataLayoutVersion = 0;
+    softwareLayoutVersion = 0;
+    featureMap.clear();
+    features.clear();
+    isInitialized = false;
+  }
+
+  public void finalized(T layoutFeature) {
+    if (layoutFeature.layoutVersion() == metadataLayoutVersion + 1) {
+      metadataLayoutVersion = layoutFeature.layoutVersion();
+    } else {
+      String msgStart = "";
+      if (layoutFeature.layoutVersion() < metadataLayoutVersion) {
+        msgStart = "Finalize attempt on a layoutFeature which has already "
+            + "been finalized.";
+      } else {
+        msgStart = "Finalize attempt on a layoutFeature that is newer than the"
+            + " next feature to be finalized.";
+      }
+
+      throw new IllegalArgumentException(
+          msgStart + "Software Layout version: " + softwareLayoutVersion
+              + " Feature Layout version: " + layoutFeature.layoutVersion());
+    }
+  }
+
+  private boolean softwareIsBehindMetaData() {
+    return metadataLayoutVersion > softwareLayoutVersion;
+  }
+
+  @Override
   public int getMetadataLayoutVersion() {
     return metadataLayoutVersion;
   }
 
+  @Override
   public int getSoftwareLayoutVersion() {
     return softwareLayoutVersion;
   }
 
+  @Override
   public boolean needsFinalization() {
     return metadataLayoutVersion < softwareLayoutVersion;
   }
 
+  @Override
   public boolean isAllowed(LayoutFeature layoutFeature) {
     return layoutFeature.layoutVersion() <= metadataLayoutVersion;
   }
 
+  @Override
   public boolean isAllowed(String featureName) {
     return featureMap.containsKey(featureName) &&
         isAllowed(featureMap.get(featureName));
   }
 
-  public LayoutFeature getFeature(String name) {
+  @Override
+  public T getFeature(String name) {
     return featureMap.get(name);
   }
 
-  public void doFinalize(Object param) {
-    if (needsFinalization()){
-      Iterator<Map.Entry<Integer, LayoutFeature>> iterator = features
-          .tailMap(metadataLayoutVersion + 1).entrySet().iterator();
-      while (iterator.hasNext()) {
-        Map.Entry<Integer, LayoutFeature> f = iterator.next();
-        Optional<? extends LayoutFeature.UpgradeAction> upgradeAction =
-            f.getValue().onFinalizeAction();
-        upgradeAction.ifPresent(action -> action.executeAction(param));
-        // ToDo : Handle shutdown while iterating case (resume from last
-        //  feature).
-        metadataLayoutVersion = f.getKey();
-      }
-      // ToDo : Persist new MLV.
-    }
-  }
-
-  protected void reset() {
-    metadataLayoutVersion = 0;
-    softwareLayoutVersion = 0;
-    featureMap.clear();
-    features.clear();
-    isInitialized = false;
+  @Override
+  public Iterable<T> unfinalizedFeatures() {
+    return features
+        .tailMap(metadataLayoutVersion + 1)
+        .values()
+        .stream()
+        .collect(Collectors.toList());
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutFeature.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutFeature.java
index 05e944e..e704c54 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutFeature.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutFeature.java
@@ -40,6 +40,6 @@ public interface LayoutFeature {
    * @param <T>
    */
   interface UpgradeAction<T> {
-    void executeAction(T arg);
+    void executeAction(T arg) throws Exception;
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
index 96463e0..e81b45f 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionInstanceFactory.java
@@ -180,10 +180,15 @@ public class LayoutVersionInstanceFactory<T> {
   }
 
   /**
-   * To be called on finalization when there is an MLV update.
-   * @param lvm LayoutVersionManager instance.
+   * To be called on finalization of a new LayoutFeature.
+   * Unregisters all the requests handlers that are there for layout versions
+   * before the feature's layout version.
+   * If the feature's layout version does not define a new handler for a
+   * request type, the previously registered handler remains registered.
+   *
+   * @param feature the feature to be finalized.
    */
-  public void onFinalize(LayoutVersionManager lvm) {
+  public void finalizeFeature(LayoutFeature feature) {
     Iterator<Map.Entry<String, PriorityQueue<VersionedInstance<T>>>> iterator =
         instances.entrySet().iterator();
     while (iterator.hasNext()) {
@@ -192,13 +197,13 @@ public class LayoutVersionInstanceFactory<T> {
       PriorityQueue<VersionedInstance<T>> vInstances = next.getValue();
       VersionedInstance<T> prevInstance = null;
       while (!vInstances.isEmpty() &&
-          vInstances.peek().version < lvm.getMetadataLayoutVersion()) {
+          vInstances.peek().version < feature.layoutVersion()) {
         prevInstance = vInstances.poll();
         LOG.info("Unregistering {} from factory. ", prevInstance.instance);
       }
 
       if ((vInstances.isEmpty() ||
-          vInstances.peek().version > lvm.getMetadataLayoutVersion())
+          vInstances.peek().version > feature.layoutVersion())
           && prevInstance != null) {
         vInstances.offer(prevInstance);
       }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionManager.java
index 432bd52..7714e80 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionManager.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/LayoutVersionManager.java
@@ -61,4 +61,6 @@ public interface LayoutVersionManager {
    * @return LayoutFeature instance.
    */
   LayoutFeature getFeature(String name);
+
+  Iterable<? extends LayoutFeature> unfinalizedFeatures();
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
new file mode 100644
index 0000000..5568d59
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalizer.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.upgrade;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Interface to define the upgrade finalizer implementations.
+ * The role of this class is to manage the LayoutFeature finalization and
+ * activation, after an upgrade was done.
+ * For different service types, where this has relevance, there should be
+ * an implementation for this interface, that handles the finalization process
+ * in tandem with the corresponding version manager, and Storage.
+ * @param <T> The service type which the implementation is bound to, this
+ *           defines the type that is provided to {@link LayoutFeature}'s
+ *           {@link 
org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface UpgradeFinalizer<T> {
+
+  Logger LOG = LoggerFactory.getLogger(UpgradeFinalizer.class);
+
+  /**
+   * Represents the current state in which the service is with regards to
+   * finalization after an upgrade.
+   * The state transitions are the following:
+   * ALREADY_FINALIZED - no entry no exit from this status without restart.
+   * After an upgrade:
+   * FINALIZATION_REQUIRED -(finalize)-> STARTING_FINALIZATION
+   * -> FINALIZATION_IN_PROGRESS -> FINALIZATION_DONE from finalization done
+   * there is no more move possible, after a restart the service can end up in:
+   * - FINALIZATION_REQUIRED, if the finalization failed and have not reached
+   * FINALIZATION_DONE,
+   * - or it can be ALREADY_FINALIZED if the finalization was successfully 
done.
+   */
+  enum Status {
+    ALREADY_FINALIZED,
+    STARTING_FINALIZATION,
+    FINALIZATION_IN_PROGRESS,
+    FINALIZATION_DONE,
+    FINALIZATION_REQUIRED
+  }
+
+  /**
+   * A class that holds the current service status, and if the finalization is
+   * ongoing, the messages that should be passed to the initiating client of
+   * finalization.
+   * This translates to a counterpart in the RPC layer.
+   * @see UpgradeFinalizationStatus in OMClientProtocol.proto
+   */
+  class StatusAndMessages {
+    private Status status;
+    private Collection<String> msgs;
+
+    /**
+     * Constructs a StatusAndMessages tuple from the given params.
+     * @param status the finalization status of the service
+     * @param msgs the messages to be transferred to the client
+     */
+    public StatusAndMessages(Status status, Collection<String> msgs) {
+      this.status = status;
+      this.msgs = msgs;
+    }
+
+    /**
+     * Provides the status.
+     * @return the upgrade finalization status.
+     */
+    public Status status() {
+      return status;
+    }
+
+    /**
+     * Provides the messages, or an empty list if there are no messages.
+     * @return a list with possibly multiple messages.
+     */
+    public Collection<String> msgs() {
+      return msgs;
+    }
+  }
+
+  /**
+   * Default message can be used to indicate the starting of finalization.
+   */
+  StatusAndMessages STARTING_MSG = new StatusAndMessages(
+      Status.STARTING_FINALIZATION,
+      Arrays.asList("Starting Finalization")
+  );
+
+  /**
+   * Default message to provide when the service is in ALREADY_FINALIZED state.
+   */
+  StatusAndMessages FINALIZED_MSG = new StatusAndMessages(
+      Status.ALREADY_FINALIZED, Collections.emptyList()
+  );
+
+  /**
+   * Finalize the metadata upgrade.
+   * The provided client ID will be eligible to get the status messages,
+   * the service provided will be provided to the
+   * {@link org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeAction}s of
+   * the {@link LayoutFeature}s being finalized.
+   * @param upgradeClientID the initiating client's identifier.
+   * @param service the service on which we run finalization.
+   * @return the status after running finalization logic, with messages to be
+   *          provided to the client
+   * @throws IOException if the finalization fails at any stage.
+   */
+  StatusAndMessages finalize(String upgradeClientID, T service)
+      throws IOException;
+
+  /**
+   * Gets a status report about the finalization process.
+   * This method has a meaning, when the client polls the server from time to
+   * time for the status, and the server runs the finalization in the
+   * background.
+   * The background finalization can supply the messages back to the polling
+   * client in this method.
+   * @param upgradeClientId the identifier of the client initiated finalization
+   * @param takeover if a new client wants to take over, from the original
+   *                 client, this should be set to true, and in this case, the
+   *                 new client ID will be eligible to get status updates.
+   *                 A finalizer implementation can decide to ignore this
+   *                 parameter, in which case it may return status to any
+   *                 client.
+   * @return the status of the finalization.
+   * @throws IOException if the implementation requires a dedicated client to
+   *          report progress to, and if the client ID is not the initiating
+   *          client ID while takover is not specified to be true.
+   *          Or in any other I/O failure scenario.
+   */
+  StatusAndMessages reportStatus(String upgradeClientId, boolean takeover)
+      throws IOException;
+
+}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestAbstractLayoutVersionManager.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestAbstractLayoutVersionManager.java
index b7faf74..1a786a2 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestAbstractLayoutVersionManager.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestAbstractLayoutVersionManager.java
@@ -20,51 +20,138 @@ package org.apache.hadoop.ozone.upgrade;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Iterator;
 
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
 
 /**
  * Test generic layout management init and APIs.
  */
+@RunWith(MockitoJUnitRunner.class)
 public class TestAbstractLayoutVersionManager {
 
-  private AbstractLayoutVersionManager versionManager =
-      new MockVersionManager();
+  @Spy
+  private AbstractLayoutVersionManager<LayoutFeature> versionManager;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testInitializationWithFeaturesToBeFinalized() throws  Exception {
+    versionManager.init(1, getTestLayoutFeatures(3));
+
+    assertEquals(3, versionManager.features.size());
+    assertEquals(3, versionManager.featureMap.size());
 
-  @Before
-  public void setUp() {
-    versionManager.reset();
+    assertEquals(1, versionManager.getMetadataLayoutVersion());
+    assertEquals(3, versionManager.getSoftwareLayoutVersion());
+
+    assertTrue(versionManager.needsFinalization());
+
+    Iterator<LayoutFeature> it =
+        versionManager.unfinalizedFeatures().iterator();
+    assertNotNull(it.next());
+    assertNotNull(it.next());
   }
 
   @Test
-  public void testInit() {
-    try {
-      versionManager.init(1,
-          getTestLayoutFeatures(2));
-      assertEquals(2, versionManager.features.size());
-      assertEquals(2, versionManager.featureMap.size());
-      assertEquals(1, versionManager.getMetadataLayoutVersion());
-      assertEquals(2, versionManager.getSoftwareLayoutVersion());
-      assertTrue(versionManager.needsFinalization());
-    } catch (IOException e) {
-      // We don't expect it to throw IOException.
-      assertTrue(false);
-    }
+  public void testInitializationWithUpToDateMetadataVersion() throws Exception 
{
+    versionManager.init(2, getTestLayoutFeatures(2));
+
+    assertEquals(2, versionManager.features.size());
+    assertEquals(2, versionManager.featureMap.size());
+
+    assertEquals(2, versionManager.getMetadataLayoutVersion());
+    assertEquals(2, versionManager.getSoftwareLayoutVersion());
+
+    assertFalse(versionManager.needsFinalization());
+    assertFalse(versionManager.unfinalizedFeatures().iterator().hasNext());
   }
 
   @Test
-  public void testNeedsFinalization() {
-    try {
-      versionManager.init(2, getTestLayoutFeatures(2));
-      assertFalse(versionManager.needsFinalization());
-    } catch (IOException e) {
-      // We don't expect it to throw IOException.
-      assertTrue(false);
-    }
+  public void testInitFailsIfNotEnoughLayoutFeaturesForVersion()
+      throws Exception {
+    exception.expect(IOException.class);
+    exception.expectMessage("Cannot initialize VersionManager.");
+
+    versionManager.init(3, getTestLayoutFeatures(2));
+  }
+
+  @Test
+  public void testFeatureFinalization() throws Exception {
+    LayoutFeature[] lfs = getTestLayoutFeatures(3);
+    versionManager.init(1, lfs);
+
+    versionManager.finalized(lfs[1]);
+
+    assertEquals(3, versionManager.features.size());
+    assertEquals(3, versionManager.featureMap.size());
+
+    assertEquals(2, versionManager.getMetadataLayoutVersion());
+    assertEquals(3, versionManager.getSoftwareLayoutVersion());
+
+    assertTrue(versionManager.needsFinalization());
+
+    Iterator<LayoutFeature> it =
+        versionManager.unfinalizedFeatures().iterator();
+    assertNotNull(it.next());
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  public void testFeatureFinalizationFailsIfTheFinalizedFeatureIsNotTheNext()
+      throws IOException {
+    exception.expect(IllegalArgumentException.class);
+
+    LayoutFeature[] lfs = getTestLayoutFeatures(3);
+    versionManager.init(1, lfs);
+
+    versionManager.finalized(lfs[2]);
+  }
+
+  @Test
+  public void testFeatureFinalizationFailsIfFeatureIsAlreadyFinalized()
+      throws IOException {
+    exception.expect(IllegalArgumentException.class);
+
+    LayoutFeature[] lfs = getTestLayoutFeatures(3);
+    versionManager.init(1, lfs);
+
+    versionManager.finalized(lfs[0]);
+  }
+
+  @Test
+  public void testUnfinalizedFeaturesAreNotAllowed() throws Exception {
+    LayoutFeature[] lfs = getTestLayoutFeatures(3);
+    versionManager.init(1, lfs);
+
+    assertTrue(versionManager.isAllowed(lfs[0].name()));
+    assertTrue(versionManager.isAllowed(lfs[0]));
+
+    assertFalse(versionManager.isAllowed(lfs[1].name()));
+    assertFalse(versionManager.isAllowed(lfs[1]));
+    assertFalse(versionManager.isAllowed(lfs[2].name()));
+    assertFalse(versionManager.isAllowed(lfs[2]));
+
+    versionManager.finalized(lfs[1]);
+
+    assertTrue(versionManager.isAllowed(lfs[0].name()));
+    assertTrue(versionManager.isAllowed(lfs[0]));
+    assertTrue(versionManager.isAllowed(lfs[1].name()));
+    assertTrue(versionManager.isAllowed(lfs[1]));
+
+    assertFalse(versionManager.isAllowed(lfs[2].name()));
+    assertFalse(versionManager.isAllowed(lfs[2]));
   }
 
   private LayoutFeature[] getTestLayoutFeatures(int num) {
@@ -92,6 +179,4 @@ public class TestAbstractLayoutVersionManager {
     return lfs;
   }
 
-  static class MockVersionManager extends AbstractLayoutVersionManager {
-  }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
index e0bb185..5af2b43 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/upgrade/TestLayoutVersionInstanceFactory.java
@@ -134,8 +134,9 @@ public class TestLayoutVersionInstanceFactory {
     assertTrue(val instanceof MockClassV1);
 
     // Finalize the layout version.
+    LayoutFeature toFeature = getMockFeatureWithVersion(3);
+    factory.finalizeFeature(toFeature);
     lvm = getMockLvm(3, 3);
-    factory.onFinalize(lvm);
 
     val = factory.get(lvm, getKey("key", null));
     assertTrue(val instanceof MockClassV2);
@@ -146,6 +147,12 @@ public class TestLayoutVersionInstanceFactory {
     assertTrue(val instanceof MockClassV2);
   }
 
+  private LayoutFeature getMockFeatureWithVersion(int layoutVersion) {
+    LayoutFeature feature = mock(LayoutFeature.class);
+    when(feature.layoutVersion()).thenReturn(layoutVersion);
+    return feature;
+  }
+
   private LayoutVersionManager getMockLvm(int mlv, int slv) {
     LayoutVersionManager lvm = mock(LayoutVersionManager.class);
     when(lvm.getMetadataLayoutVersion()).thenReturn(mlv);
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 67bd2a0..91efc49 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -242,6 +242,7 @@ public final class OmUtils {
     case GetAcl:
     case DBUpdates:
     case ListMultipartUploads:
+    case FinalizeUpgradeProgress:
       return true;
     case CreateVolume:
     case SetVolumeProperty:
@@ -271,7 +272,6 @@ public final class OmUtils {
     case AddAcl:
     case PurgeKeys:
     case RecoverTrash:
-    case FinalizeUpgradeProgress:
     case FinalizeUpgrade:
       return false;
     default:
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index a2f4d6a..3c28ae6 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -231,7 +231,11 @@ public class OMException extends IOException {
 
     PARTIAL_RENAME,
 
-    QUOTA_EXCEEDED
+    QUOTA_EXCEEDED,
 
+    PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED,
+    REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED,
+    UPDATE_LAYOUT_VERSION_FAILED,
+    LAYOUT_FEATURE_FINALIZATION_FAILED;
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 8c0d686..9fbf483 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -49,9 +49,9 @@ import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpgradeFinalizationStatus;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -341,9 +341,7 @@ public interface OzoneManagerProtocol
    * @throws OMException
    *            when finalization is already in progress.
    */
-  UpgradeFinalizationStatus finalizeUpgrade(
-      String upgradeClientID
-  ) throws IOException;
+  StatusAndMessages finalizeUpgrade(String upgradeClientID) throws IOException;
 
   /**
    * Queries the current status of finalization.
@@ -364,15 +362,15 @@ public interface OzoneManagerProtocol
    *    unless the request is forced by a new client, in which case the new
    *    client takes over the old client and the old client should exit.
    *
+   * @param takeover set force takeover of output monitoring
    * @param upgradeClientID String identifier of the upgrade finalizer client
-   * @param force set force takeover of output monitoring
    * @return the finalization status and status messages.
    * @throws IOException
    *            if there was a problem during the query
    * @throws OMException
    *            if finalization is needed but not yet started
    */
-  UpgradeFinalizationStatus queryUpgradeFinalizationProgress(
+  StatusAndMessages queryUpgradeFinalizationProgress(
       String upgradeClientID, boolean takeover
   ) throws IOException;
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 919c622..1dcbf5e 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
@@ -147,6 +146,8 @@ import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import 
org.apache.hadoop.ozone.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import 
org.apache.hadoop.ozone.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
 import 
org.apache.hadoop.ozone.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -156,6 +157,7 @@ import com.google.protobuf.ByteString;
 
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.DIRECTORY_ALREADY_EXISTS;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
@@ -258,7 +260,7 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
         .build();
 
     OMResponse omResponse = submitRequest(omRequest);
-    OzoneManagerProtocolProtos.SetVolumePropertyResponse response =
+    SetVolumePropertyResponse response =
         handleError(omResponse).getSetVolumePropertyResponse();
 
     return response.getResponse();
@@ -1079,9 +1081,8 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
   }
 
   @Override
-  public UpgradeFinalizationStatus finalizeUpgrade(
-      String upgradeClientID
-  ) throws IOException {
+  public StatusAndMessages finalizeUpgrade(String upgradeClientID)
+      throws IOException {
     FinalizeUpgradeRequest req = FinalizeUpgradeRequest.newBuilder()
         .setUpgradeClientId(upgradeClientID)
         .build();
@@ -1093,11 +1094,15 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
     FinalizeUpgradeResponse response =
         handleError(submitRequest(omRequest)).getFinalizeUpgradeResponse();
 
-    return response.getStatus();
+    UpgradeFinalizationStatus status = response.getStatus();
+    return new StatusAndMessages(
+        UpgradeFinalizer.Status.valueOf(status.getStatus().name()),
+        status.getMessagesList()
+    );
   }
 
   @Override
-  public UpgradeFinalizationStatus queryUpgradeFinalizationProgress(
+  public StatusAndMessages queryUpgradeFinalizationProgress(
       String upgradeClientID, boolean takeover
   ) throws IOException {
     FinalizeUpgradeProgressRequest req = FinalizeUpgradeProgressRequest
@@ -1114,7 +1119,12 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
         handleError(submitRequest(omRequest))
             .getFinalizeUpgradeProgressResponse();
 
-    return response.getStatus();
+    UpgradeFinalizationStatus status = response.getStatus();
+
+    return new StatusAndMessages(
+        UpgradeFinalizer.Status.valueOf(status.getStatus().name()),
+        status.getMessagesList()
+    );
   }
 
   /**
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index a2a322f..2ad08c7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -323,6 +323,10 @@ enum Status {
 
     QUOTA_EXCEEDED = 66;
 
+    PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED = 67;
+    REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED = 68;
+    UPDATE_LAYOUT_VERSION_FAILED = 69;
+    LAYOUT_FEATURE_FINALIZATION_FAILED = 70;
 }
 
 /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 3a2555a..968abc7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -41,7 +41,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -144,6 +143,7 @@ import 
org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManagerImpl;
 import org.apache.hadoop.ozone.om.upgrade.OmLayoutVersionManager;
+import org.apache.hadoop.ozone.om.upgrade.OMUpgradeFinalizer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
@@ -151,7 +151,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleI
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserVolumeInfo;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpgradeFinalizationStatus;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
@@ -167,6 +166,8 @@ import 
org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.apache.hadoop.ozone.util.ExitManager;
 import org.apache.hadoop.ozone.util.OzoneVersionInfo;
 import org.apache.hadoop.security.SecurityUtil;
@@ -277,6 +278,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private BucketManager bucketManager;
   private KeyManager keyManager;
   private PrefixManagerImpl prefixManager;
+  private UpgradeFinalizer upgradeFinalizer;
 
   private final OMMetrics metrics;
   private final ProtocolMessageMetrics<ProtocolMessageEnum>
@@ -361,6 +363,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     omId = omStorage.getOmId();
 
     versionManager = OMLayoutVersionManagerImpl.initialize(omStorage);
+    upgradeFinalizer = new OMUpgradeFinalizer(versionManager);
 
     // In case of single OM Node Service there will be no OM Node ID
     // specified, set it to value from om storage
@@ -2656,71 +2659,17 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return new ServiceInfoEx(getServiceList(), caCertPem);
   }
 
-  private final List<String> finalizationMsgs = new ArrayList<>();
-  private UpgradeFinalizationStatus.Status finalizationStatus =
-      UpgradeFinalizationStatus.Status.FINALIZATION_REQUIRED;
-
   @Override
-  public UpgradeFinalizationStatus finalizeUpgrade(String upgradeClientID)
+  public StatusAndMessages finalizeUpgrade(String upgradeClientID)
       throws IOException {
-    if (!finalizationStatus
-        .equals(UpgradeFinalizationStatus.Status.FINALIZATION_REQUIRED)){
-      throw new OMException("Finalization is not needed.", INVALID_REQUEST);
-    }
-    finalizationStatus = 
UpgradeFinalizationStatus.Status.STARTING_FINALIZATION;
-    UpgradeFinalizationStatus status = UpgradeFinalizationStatus.newBuilder()
-        .setStatus(finalizationStatus)
-        .build();
-    LOG.info("FinalizeUpgrade initiated by client: {}.", upgradeClientID);
-    if (isLeader()) {
-      finalizationMsgs.add("Finalization started.");
-      finalizationStatus =
-          UpgradeFinalizationStatus.Status.FINALIZATION_IN_PROGRESS;
-
-      new Thread(() -> {
-        LOG.info("Finalization thread started.");
-        int i = 0;
-        Random random = new Random(0xafaf);
-        while (i < 50) {
-          int rand = random.nextInt(Math.min(10, 50 - i)) + 1;
-          synchronized (finalizationMsgs) {
-            LOG.info("Emitting {} messages", rand);
-            for (int j = 0; j < rand; j++) {
-              LOG.info("Upgrade MSG: {} - added.", "Message " + i + ".");
-              finalizationMsgs.add("Message " + i + ".");
-              i++;
-            }
-          }
-          try {
-            int sleep = random.nextInt(1200);
-            LOG.info("Sleeping {}ms before emit messages again.", sleep);
-            Thread.sleep(sleep);
-          } catch (InterruptedException e) {
-            LOG.info("Finalization thread interrupted.", e);
-            return;
-          }
-        }
-        LOG.info("Finalization done.");
-        finalizationStatus = 
UpgradeFinalizationStatus.Status.FINALIZATION_DONE;
-      }, "Finalization-Thread").start();
-    }
-    return status;
+    return upgradeFinalizer.finalize(upgradeClientID, this);
   }
 
   @Override
-  public UpgradeFinalizationStatus queryUpgradeFinalizationProgress(
+  public StatusAndMessages queryUpgradeFinalizationProgress(
       String upgradeClientID, boolean takeover
   ) throws IOException {
-    UpgradeFinalizationStatus.Builder builder =
-        UpgradeFinalizationStatus.newBuilder();
-    builder.setStatus(finalizationStatus);
-    List<String> msgs = new ArrayList<>();
-    synchronized (finalizationMsgs) {
-      msgs.addAll(finalizationMsgs);
-      finalizationMsgs.clear();
-    }
-    builder.addAllMessages(msgs);
-    return builder.build();
+    return upgradeFinalizer.reportStatus(upgradeClientID, takeover);
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
deleted file mode 100644
index 9a8d56e..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeProgressRequest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om.request.upgrade;
-
-import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.FinalizeUpgradeProgress;
-
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
-import org.apache.hadoop.ozone.om.request.OMClientRequest;
-import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import 
org.apache.hadoop.ozone.om.response.upgrade.OMFinalizeUpgradeProgressResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeProgressRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeProgressResponse;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpgradeFinalizationStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Handles finalizeUpgradeProgress request that serves to query the status
- * of the async finalization progress.
- */
-public class OMFinalizeUpgradeProgressRequest extends OMClientRequest {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OMFinalizeUpgradeProgressRequest.class);
-
-  public OMFinalizeUpgradeProgressRequest(OMRequest omRequest) {
-    super(omRequest);
-  }
-
-  @Override public OMClientResponse validateAndUpdateCache(
-      OzoneManager ozoneManager, long transactionLogIndex,
-      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
-
-    LOG.info("Finalization progress check's validateAndUpdateCache"
-        + "called and started.");
-    LOG.trace("Request: {}", getOmRequest());
-    OzoneManagerProtocolProtos.OMResponse.Builder responseBuilder =
-        OmResponseUtil.getOMResponseBuilder(getOmRequest());
-    responseBuilder
-        .setCmdType(OzoneManagerProtocolProtos.Type.FinalizeUpgradeProgress);
-    OMClientResponse response = null;
-
-    try {
-      FinalizeUpgradeProgressRequest finalizeUpgradeProgressRequest =
-          getOmRequest().getFinalizeUpgradeProgressRequest();
-      String upgradeClientID =
-          finalizeUpgradeProgressRequest.getUpgradeClientId();
-      boolean takeover = finalizeUpgradeProgressRequest.getTakeover();
-
-      UpgradeFinalizationStatus status =
-          ozoneManager
-              .queryUpgradeFinalizationProgress(upgradeClientID, takeover);
-
-      FinalizeUpgradeProgressResponse omResponse =
-          FinalizeUpgradeProgressResponse.newBuilder()
-              .setStatus(status)
-              .build();
-
-      responseBuilder.setFinalizeUpgradeProgressResponse(omResponse);
-      response = new 
OMFinalizeUpgradeProgressResponse(responseBuilder.build());
-      LOG.trace("Returning response: {}", response);
-    } catch (IOException e) {
-      response = new OMFinalizeUpgradeProgressResponse(
-          createErrorOMResponse(responseBuilder, e));
-    }
-
-    return response;
-  }
-
-  public static String getRequestType() {
-    return FinalizeUpgradeProgress.name();
-  }
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
index 1b1897a..d720957 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMFinalizeUpgradeRequest.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Finaliz
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpgradeFinalizationStatus;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,6 @@ public class OMFinalizeUpgradeRequest extends OMClientRequest 
{
   public OMClientResponse validateAndUpdateCache(
       OzoneManager ozoneManager, long transactionLogIndex,
       OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
-    LOG.info("Finalization's validateAndUpdateCache called and started.");
     LOG.trace("Request: {}", getOmRequest());
     OMResponse.Builder responseBuilder =
         OmResponseUtil.getOMResponseBuilder(getOmRequest());
@@ -63,11 +63,20 @@ public class OMFinalizeUpgradeRequest extends 
OMClientRequest {
 
       String upgradeClientID = request.getUpgradeClientId();
 
-      UpgradeFinalizationStatus status =
+      StatusAndMessages omStatus =
           ozoneManager.finalizeUpgrade(upgradeClientID);
 
+      UpgradeFinalizationStatus.Status protoStatus =
+          UpgradeFinalizationStatus.Status.valueOf(omStatus.status().name());
+      UpgradeFinalizationStatus responseStatus =
+          UpgradeFinalizationStatus.newBuilder()
+              .setStatus(protoStatus)
+              .build();
+
       FinalizeUpgradeResponse omResponse =
-          FinalizeUpgradeResponse.newBuilder().setStatus(status).build();
+          FinalizeUpgradeResponse.newBuilder()
+              .setStatus(responseStatus)
+              .build();
       responseBuilder.setFinalizeUpgradeResponse(omResponse);
       response = new OMFinalizeUpgradeResponse(responseBuilder.build());
       LOG.trace("Returning response: {}", response);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMFinalizeUpgradeProgressResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMFinalizeUpgradeProgressResponse.java
deleted file mode 100644
index f07e275..0000000
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMFinalizeUpgradeProgressResponse.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.ozone.om.response.upgrade;
-
-import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
-import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-
-import java.io.IOException;
-
-/**
- * Response for finalizeUpgradeProgress request.
- */
-// yepp this will not be a write request, adding a table here to the annotation
-// just to pass tests related to this annotation.
-@CleanupTableInfo(cleanupTables = { OmMetadataManagerImpl.USER_TABLE })
-public class OMFinalizeUpgradeProgressResponse extends OMClientResponse {
-  public OMFinalizeUpgradeProgressResponse(
-      OzoneManagerProtocolProtos.OMResponse omResponse) {
-    super(omResponse);
-  }
-
-  @Override protected void addToDBBatch(OMMetadataManager omMetadataManager,
-      BatchOperation batchOperation) throws IOException {
-
-  }
-}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
index 3533dc8..a37862b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutVersionManagerImpl.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.om.OMStorage;
-import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.upgrade.AbstractLayoutVersionManager;
@@ -45,7 +44,8 @@ import com.google.common.annotations.VisibleForTesting;
  * Class to manage layout versions and features for Ozone Manager.
  */
 public final class OMLayoutVersionManagerImpl
-    extends AbstractLayoutVersionManager implements OmLayoutVersionManager {
+    extends AbstractLayoutVersionManager<OMLayoutFeature>
+    implements OmLayoutVersionManager {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OMLayoutVersionManagerImpl.class);
@@ -94,15 +94,15 @@ public final class OMLayoutVersionManagerImpl
     try {
       init(storage.getLayoutVersion(), OMLayoutFeature.values());
     } catch (IOException e) {
-      throw new OMException(String.format(e.getMessage()),
+      throw new OMException(
+          String.format("Cannot initialize VersionManager. Metadata " +
+                  "layout version (%d) > software layout version (%d)",
+              metadataLayoutVersion, softwareLayoutVersion),
+          e,
           NOT_SUPPORTED_OPERATION);
     }
-    registerOzoneManagerRequests();
-  }
 
-  public void doFinalize(OzoneManager om) {
-    super.doFinalize(om);
-    requestFactory.onFinalize(this);
+    registerOzoneManagerRequests();
   }
 
   @VisibleForTesting
@@ -162,14 +162,19 @@ public final class OMLayoutVersionManagerImpl
 
   /**
    * Given a type and version, get the corresponding request class type.
-   * @param requestType type string
-   * @param version version
+   * @param type type string
    * @return class type.
    */
   @Override
-  public Class< ? extends OMClientRequest> getRequestHandler(String type) {
+  public Class<? extends OMClientRequest> getRequestHandler(String type) {
     VersionFactoryKey versionFactoryKey = new VersionFactoryKey.Builder()
         .key(type).build();
     return requestFactory.get(this, versionFactoryKey);
   }
+
+  @Override
+  public void finalized(OMLayoutFeature layoutFeature) {
+    super.finalized(layoutFeature);
+    requestFactory.finalizeFeature(layoutFeature);
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
new file mode 100644
index 0000000..85e1382
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.*;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.UPDATE_LAYOUT_VERSION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS;
+
+/**
+ * UpgradeFinalizer implementation for the Ozone Manager service.
+ */
+public class OMUpgradeFinalizer implements UpgradeFinalizer<OzoneManager> {
+
+  private  static final OmUpgradeAction NOOP = a -> {};
+
+  private OMLayoutVersionManagerImpl versionManager;
+  private String clientID;
+
+  private Queue<String> msgs = new ConcurrentLinkedQueue<>();
+  private boolean isDone = false;
+
+  public OMUpgradeFinalizer(OMLayoutVersionManagerImpl versionManager) {
+    this.versionManager = versionManager;
+  }
+
+  @Override
+  public StatusAndMessages finalize(String upgradeClientID, OzoneManager om)
+      throws IOException {
+    if (!versionManager.needsFinalization()) {
+      return FINALIZED_MSG;
+    }
+    clientID = upgradeClientID;
+
+// This requires some more investigation on how to do it properly while
+// requests are on the fly, and post finalize features one by one.
+// Until that is done, monitoring is not really doing anything meaningful
+// but this is a tradoff we can take for the first iteration either if needed,
+// as the finalization of the first few features should not take that long.
+// Follow up JIRA is in HDDS-4286
+//    String threadName = "OzoneManager-Upgrade-Finalizer";
+//    ExecutorService executor =
+//        Executors.newSingleThreadExecutor(r -> new Thread(threadName));
+//    executor.submit(new Worker(om));
+    new Worker(om).call();
+    return STARTING_MSG;
+  }
+
+  @Override
+  public synchronized StatusAndMessages reportStatus(
+      String upgradeClientID, boolean takeover
+  ) throws IOException {
+    if (takeover) {
+      clientID = upgradeClientID;
+    }
+    assertClientId(upgradeClientID);
+    List<String> returningMsgs = new ArrayList<>(msgs.size()+10);
+    Status status = isDone ? FINALIZATION_DONE : FINALIZATION_IN_PROGRESS;
+    while (msgs.size() > 0) {
+      returningMsgs.add(msgs.poll());
+    }
+    return new StatusAndMessages(status, returningMsgs);
+  }
+
+  private void assertClientId(String id) throws OMException {
+    if (!this.clientID.equals(id)) {
+      throw new OMException("Unknown client tries to get finalization 
status.\n"
+          + "The requestor is not the initiating client of the finalization,"
+          + " if you want to take over, and get unsent status messages, check"
+          + " -takeover option.", INVALID_REQUEST);
+    }
+  }
+
+  /**
+   * This class implements the finalization logic applied to every
+   * LayoutFeature that needs to be finalized.
+   *
+   * For the first approach this happens synchronously within the state machine
+   * during the FinalizeUpgrade request, but ideally this has to be moved to
+   * individual calls that are going into the StateMaching one by one.
+   * The prerequisits for this to happen in the background are the following:
+   * - more fine grained control for LayoutFeatures to prepare the
+   *    finalization outside the state machine, do the switch from old to new
+   *    logic inside the statemachine and apply the finalization, and then do
+   *    any cleanup necessary outside the state machine
+   * - a way to post a request to the state machine that is not part of the
+   *    client API, so basically not an OMRequest, but preferably an internal
+   *    request, which is posted from the leader OM to the follower OMs only.
+   * - ensure that there is a possibility to implement a rollback logic if
+   *    something goes wrong inside the state machine, to avoid OM stuck in an
+   *    intermediate state due to an error.
+   */
+  private class Worker implements Callable<Void> {
+    private OzoneManager ozoneManager;
+
+    /**
+     * Initiates the Worker, for the specified OM instance.
+     * @param om the OzoneManager instance on which to finalize the new
+     *           LayoutFeatures.
+     */
+    Worker(OzoneManager om) {
+      ozoneManager = om;
+    }
+
+    @Override
+    public Void call() throws OMException {
+      try {
+        emitStartingMsg();
+
+        for (OMLayoutFeature f : versionManager.unfinalizedFeatures()) {
+          finalizeFeature(f);
+          updateLayoutVersionInVersionFile(f);
+          versionManager.finalized(f);
+        }
+
+        emitFinishedMsg();
+      } finally {
+        isDone = true;
+      }
+      return null;
+    }
+
+    private void finalizeFeature(OMLayoutFeature feature)
+        throws OMException {
+      OmUpgradeAction action = feature.onFinalizeAction().orElse(NOOP);
+
+      if (action == NOOP) {
+        emitNOOPMsg(feature.name());
+        return;
+      }
+
+      putFinalizationMarkIntoVersionFile(feature);
+
+      emitStartingFinalizationActionMsg(feature.name());
+      try {
+        action.executeAction(ozoneManager);
+      } catch (Exception e) {
+        logFinalizationFailureAndThrow(e, feature.name());
+      }
+      emitFinishFinalizationActionMsg(feature.name());
+
+      removeFinalizationMarkFromVersionFile(feature);
+    }
+
+    private void updateLayoutVersionInVersionFile(OMLayoutFeature feature)
+        throws OMException {
+      int prevLayoutVersion = currentStoredLayoutVersion();
+
+      updateStorageLayoutVersion(feature.layoutVersion());
+      try {
+        persistStorage();
+      } catch (IOException e) {
+        updateStorageLayoutVersion(prevLayoutVersion);
+        logLayoutVersionUpdateFailureAndThrow(e);
+      }
+    }
+
+    private void putFinalizationMarkIntoVersionFile(OMLayoutFeature feature)
+        throws OMException {
+      try {
+        emitUpgradeToLayoutVersionPersistingMsg(feature.name());
+
+        setUpgradeToLayoutVersionInStorage(feature.layoutVersion());
+        persistStorage();
+
+        emitUpgradeToLayoutVersionPersistedMsg();
+      } catch (IOException e) {
+        logUpgradeToLayoutVersionPersistingFailureAndThrow(feature.name(), e);
+      }
+    }
+
+    private void removeFinalizationMarkFromVersionFile(OMLayoutFeature feature)
+        throws OMException {
+      try {
+        emitRemovingUpgradeToLayoutVersionMsg(feature.name());
+
+        unsetUpgradeToLayoutVersionInStorage();
+        persistStorage();
+
+        emitRemovedUpgradeToLayoutVersionMsg();
+      } catch (IOException e) {
+        logUpgradeToLayoutVersionRemovalFailureAndThrow(feature.name(), e);
+      }
+    }
+
+
+
+
+
+    private void setUpgradeToLayoutVersionInStorage(int version) {
+      ozoneManager.getOmStorage().setUpgradeToLayoutVersion(version);
+    }
+
+    private void unsetUpgradeToLayoutVersionInStorage() {
+      ozoneManager.getOmStorage().unsetUpgradeToLayoutVersion();
+    }
+
+    private int currentStoredLayoutVersion() {
+      return ozoneManager.getOmStorage().getLayoutVersion();
+    }
+
+    private void updateStorageLayoutVersion(int version) {
+      ozoneManager.getOmStorage().setLayoutVersion(version);
+    }
+
+    private void persistStorage() throws IOException {
+      ozoneManager.getOmStorage().persistCurrentState();
+    }
+
+    private void emitNOOPMsg(String feature) {
+      String msg = "No finalization work defined for feature: " + feature + 
".";
+      String msg2 = "Skipping.";
+
+      logAndEmit(msg);
+      logAndEmit(msg2);
+    }
+
+    private void emitStartingMsg() {
+      String msg = "Finalization started.";
+      logAndEmit(msg);
+    }
+
+    private void emitFinishedMsg() {
+      String msg = "Finalization is done.";
+      logAndEmit(msg);
+    }
+
+    private void emitStartingFinalizationActionMsg(String feature) {
+      String msg = "Executing finalization of feature: " + feature + ".";
+      logAndEmit(msg);
+    }
+
+    private void emitFinishFinalizationActionMsg(String feature) {
+      String msg = "The feature " + feature + " is finalized.";
+      logAndEmit(msg);
+    }
+
+    private void emitUpgradeToLayoutVersionPersistingMsg(String feature) {
+      String msg = "Mark finalization of " + feature + " in VERSION file.";
+      logAndEmit(msg);
+    }
+
+    private void emitUpgradeToLayoutVersionPersistedMsg() {
+      String msg = "Finalization mark placed.";
+      logAndEmit(msg);
+    }
+
+    private void emitRemovingUpgradeToLayoutVersionMsg(String feature) {
+      String msg = "Remove finalization mark of " + feature
+          + " feature from VERSION file.";
+      logAndEmit(msg);
+    }
+
+    private void emitRemovedUpgradeToLayoutVersionMsg() {
+      String msg = "Finalization mark removed.";
+      logAndEmit(msg);
+    }
+
+    private void logAndEmit(String msg) {
+      LOG.info(msg);
+      msgs.offer(msg);
+    }
+
+    private void logFinalizationFailureAndThrow(Exception e, String feature)
+        throws OMException {
+      String msg = "Error during finalization of " + feature + ".";
+      logAndThrow(e, msg, LAYOUT_FEATURE_FINALIZATION_FAILED);
+    }
+
+    private void logLayoutVersionUpdateFailureAndThrow(IOException e)
+        throws OMException {
+      String msg = "Updating the LayoutVersion in the VERSION file failed.";
+      logAndThrow(e, msg, UPDATE_LAYOUT_VERSION_FAILED);
+    }
+
+    private void logUpgradeToLayoutVersionPersistingFailureAndThrow(
+        String feature, IOException e
+    ) throws OMException {
+      String msg = "Failed to update VERSION file with the upgrading feature: "
+          + feature + ".";
+      logAndThrow(e, msg, PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED);
+    }
+
+    private void logUpgradeToLayoutVersionRemovalFailureAndThrow(
+        String feature, IOException e) throws OMException {
+      String msg =
+          "Failed to unmark finalization of " + feature + " LayoutFeature.";
+      logAndThrow(e, msg, REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED);
+    }
+
+    private void logAndThrow(Exception e, String msg, ResultCodes resultCode)
+        throws OMException {
+      LOG.error(msg, e);
+      throw new OMException(msg, e, resultCode);
+    }
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index dd9e704..dcf3e14 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -50,6 +50,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Allocat
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeProgressRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.FinalizeUpgradeProgressResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketRequest;
@@ -75,6 +77,7 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Service
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpgradeFinalizationStatus;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 
 import com.google.common.collect.Lists;
@@ -93,6 +96,7 @@ import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartInfo;
 
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -204,6 +208,12 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
             getAcl(request.getGetAclRequest());
         responseBuilder.setGetAclResponse(getAclResponse);
         break;
+      case FinalizeUpgradeProgress:
+        FinalizeUpgradeProgressResponse upgradeProgressResponse =
+            reportUpgradeProgress(request.getFinalizeUpgradeProgressRequest());
+        responseBuilder
+            .setFinalizeUpgradeProgressResponse(upgradeProgressResponse);
+        break;
       default:
         responseBuilder.setSuccess(false);
         responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
@@ -263,6 +273,7 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
   }
 
   // Convert and exception to corresponding status code
+
   protected Status exceptionToResponseStatus(IOException ex) {
     if (ex instanceof OMException) {
       return Status.values()[((OMException) ex).getResult().ordinal()];
@@ -273,7 +284,6 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
       return Status.INTERNAL_ERROR;
     }
   }
-
   /**
    * Validates that the incoming OM request has required parameters.
    * TODO: Add more validation checks before writing the request to Ratis log.
@@ -591,6 +601,28 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
     return listStatusResponseBuilder.build();
   }
 
+  private FinalizeUpgradeProgressResponse reportUpgradeProgress(
+      FinalizeUpgradeProgressRequest request) throws IOException {
+    String upgradeClientId = request.getUpgradeClientId();
+    boolean takeover = request.getTakeover();
+
+    StatusAndMessages progress =
+        impl.queryUpgradeFinalizationProgress(upgradeClientId, takeover);
+
+    UpgradeFinalizationStatus.Status protoStatus =
+        UpgradeFinalizationStatus.Status.valueOf(progress.status().name());
+
+    UpgradeFinalizationStatus response =
+        UpgradeFinalizationStatus.newBuilder()
+            .setStatus(protoStatus)
+            .addAllMessages(progress.msgs())
+            .build();
+
+    return FinalizeUpgradeProgressResponse.newBuilder()
+        .setStatus(response)
+        .build();
+  }
+
   protected OzoneManager getOzoneManager() {
     return impl;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
new file mode 100644
index 0000000..fbb7ce9
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMUpgradeFinalizer.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.upgrade;
+
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.LAYOUT_FEATURE_FINALIZATION_FAILED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static 
org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.STARTING_FINALIZATION;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * {@link OMUpgradeFinalizer} tests.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOMUpgradeFinalizer {
+
+  private static final String CLIENT_ID = "clientID";
+  private static final String OTHER_CLIENT_ID = "otherClientID";
+
+  @Mock
+  private OMLayoutVersionManagerImpl versionManager;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testEmitsFinalizedStatusIfAlreadyFinalized() throws Exception {
+    when(versionManager.needsFinalization()).thenReturn(false);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    StatusAndMessages ret = finalizer.finalize(CLIENT_ID, null);
+
+    assertEquals(ALREADY_FINALIZED, ret.status());
+  }
+
+  @Test
+  public void testEmitsStartingStatusOnFinalization() throws Exception {
+    Iterable<OMLayoutFeature> lfs = mockFeatures(3, "feature-3", "feature-4");
+    setupVersionManagerMockToFinalize(lfs);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    StatusAndMessages ret = finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
+
+    assertEquals(STARTING_FINALIZATION, ret.status());
+  }
+
+  /*
+   * This test ensures that whenever finalize() is called, we finish all
+   * finalization step, and getting the report gives back a FINALIZATION_DONE
+   * status. This has to be revisited as soon as we change the behaviour to
+   * post the finalization steps to the state machine from bg thread one by 
one.
+   * This also means that FINALIZATION_IN_PROGRESS status related tests
+   * has to be added at the same time.
+   */
+  @Test
+  public void testReportStatusResultsInFinalizationDone()
+      throws Exception {
+    Iterable<OMLayoutFeature> lfs = mockFeatures(3, "feature-3", "feature-4");
+    setupVersionManagerMockToFinalize(lfs);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
+
+    StatusAndMessages ret = finalizer.reportStatus(CLIENT_ID, false);
+
+    assertEquals(UpgradeFinalizer.Status.FINALIZATION_DONE, ret.status());
+  }
+
+  @Test
+  public void testReportStatusAllowsTakeover()
+      throws Exception {
+    Iterable<OMLayoutFeature> lfs = mockFeatures(3, "feature-3", "feature-4");
+    setupVersionManagerMockToFinalize(lfs);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
+
+    StatusAndMessages ret = finalizer.reportStatus(OTHER_CLIENT_ID, true);
+
+    assertEquals(UpgradeFinalizer.Status.FINALIZATION_DONE, ret.status());
+  }
+
+  @Test
+  public void testReportStatusFailsFromNewClientIfRequestIsNotATakeover()
+      throws Exception {
+    Iterable<OMLayoutFeature> lfs = mockFeatures(3, "feature-3", "feature-4");
+    setupVersionManagerMockToFinalize(lfs);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    finalizer.finalize(CLIENT_ID, mockOzoneManager(2));
+
+    exception.expect(OMException.class);
+    exception.expectMessage("Unknown client");
+
+    finalizer.reportStatus(OTHER_CLIENT_ID, false);
+  }
+
+  @Test
+  public void testFinalizationWithUpgradeAction() throws Exception {
+    Optional<OmUpgradeAction> action = Optional.of(om -> om.getVersion());
+    OzoneManager om = mockOzoneManager(0);
+    Iterable<OMLayoutFeature> lfs = mockFeatures("feature-1", "feature-2");
+    when(lfs.iterator().next().onFinalizeAction()).thenReturn(action);
+    setupVersionManagerMockToFinalize(lfs);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    finalizer.finalize(CLIENT_ID, om);
+
+
+    Iterator<OMLayoutFeature> it = lfs.iterator();
+    OMLayoutFeature f = it.next();
+
+    // the first feature has an upgrade action, it calls the setUpgradeToLV
+    // method, and the action execution is checked by verifying on 
om.getVersion
+    verify(om.getOmStorage(), once())
+        .setUpgradeToLayoutVersion(f.layoutVersion());
+    verify(om.getOmStorage(), once())
+        .setLayoutVersion(f.layoutVersion());
+    verify(om.getOmStorage(), once())
+        .unsetUpgradeToLayoutVersion();
+    verify(om, once()).getVersion();
+
+    // the second feature has a NOOP it should not call the setUpgradeToLV
+    // method, but should update the LV.
+    f = it.next();
+    verify(om.getOmStorage(), never())
+        .setUpgradeToLayoutVersion(f.layoutVersion());
+    verify(om.getOmStorage(), once())
+        .setLayoutVersion(f.layoutVersion());
+
+    StatusAndMessages status = finalizer.reportStatus(CLIENT_ID, false);
+    assertEquals(FINALIZATION_DONE, status.status());
+    assertFalse(status.msgs().isEmpty());
+  }
+
+  @Test
+  public void testFinalizationWithFailingUpgradeAction() throws Exception {
+    Optional<OmUpgradeAction> action = Optional.of(
+        ignore -> {
+          throw new IOException("Fail.");
+        }
+    );
+
+    OzoneManager om = mockOzoneManager(0);
+    Iterable<OMLayoutFeature> lfs = mockFeatures("feature-1", "feature-2");
+    when(lfs.iterator().next().onFinalizeAction()).thenReturn(action);
+    setupVersionManagerMockToFinalize(lfs);
+
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(versionManager);
+    try {
+      finalizer.finalize(CLIENT_ID, om);
+      fail();
+    } catch (Exception e) {
+      assertThat(e, instanceOf(OMException.class));
+      assertThat(e.getMessage(), containsString(lfs.iterator().next().name()));
+      assertEquals(
+          ((OMException) e).getResult(),
+          LAYOUT_FEATURE_FINALIZATION_FAILED
+      );
+    }
+
+    // Verify that we have never removed the upgradeToLV from the storage
+    // as finalization of the first feature in the list fails.
+    // Also verify that we have never updated the LV.
+    Iterator<OMLayoutFeature> it = lfs.iterator();
+    OMLayoutFeature f = it.next();
+    verify(om.getOmStorage(), once())
+        .setUpgradeToLayoutVersion(f.layoutVersion());
+    verify(om.getOmStorage(), never())
+        .setLayoutVersion(f.layoutVersion());
+    verify(om.getOmStorage(), never())
+        .unsetUpgradeToLayoutVersion();
+
+    // Verify that we never got to the second feature.
+    f = it.next();
+    verify(om.getOmStorage(), never())
+        .setUpgradeToLayoutVersion(f.layoutVersion());
+    verify(om.getOmStorage(), never())
+        .setLayoutVersion(f.layoutVersion());
+
+    StatusAndMessages status = finalizer.reportStatus(CLIENT_ID, false);
+    assertEquals(FINALIZATION_DONE, status.status());
+    assertFalse(status.msgs().isEmpty());
+  }
+
+
+
+  private VerificationMode once() {
+    return times(1);
+  }
+
+  private void setupVersionManagerMockToFinalize(
+      Iterable<OMLayoutFeature> lfs
+  ) {
+    when(versionManager.needsFinalization()).thenReturn(true);
+    when(versionManager.unfinalizedFeatures()).thenReturn(lfs);
+  }
+
+  private OMLayoutFeature mockFeature(String name, int version) {
+    OMLayoutFeature f = mock(OMLayoutFeature.class);
+    when(f.name()).thenReturn(name);
+    when(f.layoutVersion()).thenReturn(version);
+    return f;
+  }
+
+  private Iterable<OMLayoutFeature> mockFeatures(String... names) {
+    return mockFeatures(1, names);
+  }
+
+  private Iterable<OMLayoutFeature> mockFeatures(
+      int startFromLV, String... names
+  ) {
+    int i=startFromLV;
+    List<OMLayoutFeature> ret = new ArrayList<>();
+    for (String name : names) {
+      ret.add(mockFeature(name, i));
+      i++;
+    }
+    return ret;
+  }
+
+  private int storedLayoutVersion = 0;
+
+  private OzoneManager mockOzoneManager(int initialLayoutVersion) {
+    OzoneManager mock = mock(OzoneManager.class);
+    OMStorage st = mock(OMStorage.class);
+    storedLayoutVersion = initialLayoutVersion;
+
+    doAnswer(
+        (Answer<Void>) inv -> {
+          storedLayoutVersion = inv.getArgument(0, Integer.class);
+          return null;
+        }).when(st).setLayoutVersion(anyInt());
+
+    when(st.getLayoutVersion())
+        .thenAnswer((Answer<Integer>) ignore -> storedLayoutVersion);
+
+    when(mock.getOmStorage()).thenReturn(st);
+    return mock;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
index 9116e59..95dae66 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOMVersionManager.java
@@ -52,11 +52,15 @@ public class TestOMVersionManager {
     when(omStorage.getLayoutVersion()).thenReturn(0);
     OMLayoutVersionManagerImpl omVersionManager =
         OMLayoutVersionManagerImpl.initialize(omStorage);
+    OzoneManager om = mock(OzoneManager.class);
+    when(om.getOmStorage()).thenReturn(omStorage);
+
     assertTrue(omVersionManager.isAllowed(INITIAL_VERSION));
     assertFalse(omVersionManager.isAllowed(CREATE_EC));
     assertEquals(0, omVersionManager.getMetadataLayoutVersion());
     assertTrue(omVersionManager.needsFinalization());
-    omVersionManager.doFinalize(mock(OzoneManager.class));
+    OMUpgradeFinalizer finalizer = new OMUpgradeFinalizer(omVersionManager);
+    finalizer.finalize("random", om);
     assertFalse(omVersionManager.needsFinalization());
     assertEquals(2, omVersionManager.getMetadataLayoutVersion());
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
index dffddd1..df6f6bf 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/upgrade/TestOmVersionManagerRequestFactory.java
@@ -47,12 +47,15 @@ import org.reflections.Reflections;
 public class TestOmVersionManagerRequestFactory {
 
   private static OMLayoutVersionManagerImpl omVersionManager;
+  private static OzoneManager om;
 
   @BeforeClass
   public static void setup() throws OMException {
     OMStorage omStorage = mock(OMStorage.class);
     when(omStorage.getLayoutVersion()).thenReturn(0);
     omVersionManager = OMLayoutVersionManagerImpl.initialize(omStorage);
+    om = mock(OzoneManager.class);
+    when(om.getOmStorage()).thenReturn(omStorage);
   }
 
   @Test
@@ -64,7 +67,8 @@ public class TestOmVersionManagerRequestFactory {
     Assert.assertEquals(requestType, OMKeyCreateRequest.class);
 
     // Finalize the version manager.
-    omVersionManager.doFinalize(mock(OzoneManager.class));
+    OMUpgradeFinalizer f = new OMUpgradeFinalizer(omVersionManager);
+    f.finalize("random", om);
 
     // Try getting 'CreateKey' again. Should return CreateECKey.
     requestType = omVersionManager.getRequestHandler(CreateKey.name());
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/FinalizeUpgradeSubCommand.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/FinalizeUpgradeSubCommand.java
index b35c621..7bf1484 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/FinalizeUpgradeSubCommand.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/FinalizeUpgradeSubCommand.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.admin.om;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpgradeFinalizationStatus;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer;
 import picocli.CommandLine;
 
 import java.io.IOException;
@@ -78,16 +78,16 @@ public class FinalizeUpgradeSubCommand implements 
Callable<Void> {
         parent.createOmClient(omServiceId, omHost, forceHA);
     String upgradeClientID = "Upgrade-Client-" + UUID.randomUUID().toString();
     try {
-      UpgradeFinalizationStatus status =
+      UpgradeFinalizer.StatusAndMessages finalizationResponse =
           client.finalizeUpgrade(upgradeClientID);
-      if (isFinalized(status)){
+      if (isFinalized(finalizationResponse.status())){
         System.out.println("Upgrade has already been finalized.");
         emitExitMsg();
         return null;
-      } else if (!isStarting(status)){
+      } else if (!isStarting(finalizationResponse.status())){
         System.err.println("Invalid response from Ozone Manager.");
         System.err.println(
-            "Current finalization status is: " + status.getStatus()
+            "Current finalization status is: " + finalizationResponse.status()
         );
         throw new IOException("Exiting...");
       }
@@ -159,19 +159,19 @@ public class FinalizeUpgradeSubCommand implements 
Callable<Void> {
         Thread.sleep(500);
         // do not check for exceptions, if one happens during monitoring we
         // should report it and exit.
-        UpgradeFinalizationStatus status =
+        UpgradeFinalizer.StatusAndMessages progress =
             client.queryUpgradeFinalizationProgress(upgradeClientID, force);
         // this can happen after trying to takeover the request after the fact
         // when there is already nothing to take over.
-        if (isFinalized(status)) {
+        if (isFinalized(progress.status())) {
           System.out.println("Finalization already finished.");
           emitExitMsg();
           return null;
         }
-        if (isInprogress(status) || isDone(status)) {
-          
status.getMessagesList().stream().forEachOrdered(System.out::println);
+        if (isInprogress(progress.status()) || isDone(progress.status())) {
+          progress.msgs().stream().forEachOrdered(System.out::println);
         }
-        if (isDone(status)) {
+        if (isDone(progress.status())) {
           emitExitMsg();
           finished = true;
         }
@@ -184,24 +184,20 @@ public class FinalizeUpgradeSubCommand implements 
Callable<Void> {
     System.out.println("Exiting...");
   }
 
-  private static boolean isFinalized(UpgradeFinalizationStatus status) {
-    return status.getStatus()
-        .equals(UpgradeFinalizationStatus.Status.ALREADY_FINALIZED);
+  private static boolean isFinalized(UpgradeFinalizer.Status status) {
+    return status.equals(UpgradeFinalizer.Status.ALREADY_FINALIZED);
   }
 
-  private static boolean isDone(UpgradeFinalizationStatus status) {
-    return status.getStatus()
-        .equals(UpgradeFinalizationStatus.Status.FINALIZATION_DONE);
+  private static boolean isDone(UpgradeFinalizer.Status status) {
+    return status.equals(UpgradeFinalizer.Status.FINALIZATION_DONE);
   }
 
-  private static boolean isInprogress(UpgradeFinalizationStatus status) {
-    return status.getStatus()
-        .equals(UpgradeFinalizationStatus.Status.FINALIZATION_IN_PROGRESS);
+  private static boolean isInprogress(UpgradeFinalizer.Status status) {
+    return status.equals(UpgradeFinalizer.Status.FINALIZATION_IN_PROGRESS);
   }
 
-  private static boolean isStarting(UpgradeFinalizationStatus status) {
-    return status.getStatus()
-        .equals(UpgradeFinalizationStatus.Status.STARTING_FINALIZATION);
+  private static boolean isStarting(UpgradeFinalizer.Status status) {
+    return status.equals(UpgradeFinalizer.Status.STARTING_FINALIZATION);
   }
 
   private static void emitGeneralErrorMsg() {


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to