This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new c45449c17f HDDS-9638. [hsync] File recovery support in OM (#5847)
c45449c17f is described below

commit c45449c17f7622ba2380587a31568a068c2a3580
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Jan 9 10:10:34 2024 +0800

    HDDS-9638. [hsync] File recovery support in OM (#5847)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../hadoop/ozone/om/exceptions/OMException.java    |   4 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  19 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  38 ++-
 .../debug/ozone-debug-lease-recovery.robot         |   4 +-
 .../apache/hadoop/fs/ozone/TestLeaseRecovery.java  |   3 +-
 .../src/main/proto/OmClientProtocol.proto          |   6 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   5 +-
 .../om/request/file/OMRecoverLeaseRequest.java     | 107 +++----
 .../om/request/key/OMAllocateBlockRequest.java     |   7 +-
 .../request/key/OMAllocateBlockRequestWithFSO.java |   6 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  71 +++--
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  60 ++--
 .../om/response/file/OMRecoverLeaseResponse.java   |  21 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |   2 +-
 .../response/key/OMKeyCommitResponseWithFSO.java   |   8 +-
 .../ozone/om/request/OMRequestTestUtils.java       |   2 +-
 .../om/request/file/TestOMRecoverLeaseRequest.java | 325 ++++++++++++++++++---
 .../om/response/key/TestOMKeyCommitResponse.java   |   2 +-
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  12 +-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  12 +-
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   6 +-
 .../java/org/apache/hadoop/fs/ozone/Statistic.java |   5 +-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |  14 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |  16 +-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |  14 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |  14 +-
 27 files changed, 573 insertions(+), 211 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 01fa7c8a52..25690ed105 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -412,6 +412,7 @@ public final class OzoneConsts {
 
   /** Metadata stored in OmKeyInfo. */
   public static final String HSYNC_CLIENT_ID = "hsyncClientId";
+  public static final String LEASE_RECOVERY = "leaseRecovery";
 
   //GDPR
   public static final String GDPR_FLAG = "gdprEnabled";
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 20b4fb1ed0..c5cf619e37 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -267,6 +267,8 @@ public class OMException extends IOException {
 
     S3_SECRET_ALREADY_EXISTS,
     
-    INVALID_PATH
+    INVALID_PATH,
+    KEY_UNDER_LEASE_RECOVERY,
+    KEY_ALREADY_CLOSED
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index ad394bf4d1..824346058d 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.UUID;
 
 import javax.annotation.Nonnull;
+
 import org.apache.hadoop.fs.SafeModeAction;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -269,6 +270,19 @@ public interface OzoneManagerProtocol
             "this to be implemented, as write requests use a new approach.");
   }
 
+  /**
+   * Recovery and commit a key. This will make the change from the client 
visible. The client
+   * is identified by the clientID.
+   *
+   * @param args the key to commit
+   * @param clientID the client identification
+   * @throws IOException
+   */
+  default void recoverKey(OmKeyArgs args, long clientID)
+      throws IOException {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented, as write requests use a new approach.");
+  }
 
   /**
    * Allocate a new block, it is assumed that the client is having an open key
@@ -1086,11 +1100,10 @@ public interface OzoneManagerProtocol
    * @param volumeName - The volume name.
    * @param bucketName - The bucket name.
    * @param keyName - The key user want to recover.
-   * @return true if the file is already closed
+   * @return OmKeyInfo KeyInfo is file under recovery
    * @throws IOException if an error occurs
    */
-  boolean recoverLease(String volumeName, String bucketName,
-                              String keyName) throws IOException;
+  List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String 
keyName) throws IOException;
 
   /**
    * Update modification time and access time of a file.
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index a179ca5c40..f9572b3bdc 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -777,13 +777,19 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
   @Override
   public void hsyncKey(OmKeyArgs args, long clientId)
           throws IOException {
-    updateKey(args, clientId, true);
+    updateKey(args, clientId, true, false);
   }
 
   @Override
   public void commitKey(OmKeyArgs args, long clientId)
           throws IOException {
-    updateKey(args, clientId, false);
+    updateKey(args, clientId, false, false);
+  }
+
+  @Override
+  public void recoverKey(OmKeyArgs args, long clientId)
+      throws IOException {
+    updateKey(args, clientId, false, true);
   }
 
   public static void setReplicationConfig(ReplicationConfig replication,
@@ -799,7 +805,7 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
     b.setType(replication.getReplicationType());
   }
 
-  private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
+  private void updateKey(OmKeyArgs args, long clientId, boolean hsync, boolean 
recovery)
       throws IOException {
     CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
     List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
@@ -820,15 +826,13 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
     req.setKeyArgs(keyArgsBuilder.build());
     req.setClientID(clientId);
     req.setHsync(hsync);
-
+    req.setRecovery(recovery);
 
     OMRequest omRequest = createOMRequest(Type.CommitKey)
         .setCommitKeyRequest(req)
         .build();
 
     handleError(submitRequest(omRequest));
-
-
   }
 
   @Override
@@ -2440,21 +2444,23 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
   }
 
   @Override
-  public boolean recoverLease(String volumeName, String bucketName,
-                              String keyName) throws IOException {
+  public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, 
String keyName) throws IOException {
     RecoverLeaseRequest recoverLeaseRequest =
-            RecoverLeaseRequest.newBuilder()
-                    .setVolumeName(volumeName)
-                    .setBucketName(bucketName)
-                    .setKeyName(keyName)
-                    .build();
+        RecoverLeaseRequest.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .build();
 
     OMRequest omRequest = createOMRequest(Type.RecoverLease)
-            .setRecoverLeaseRequest(recoverLeaseRequest).build();
+        .setRecoverLeaseRequest(recoverLeaseRequest).build();
 
     RecoverLeaseResponse recoverLeaseResponse =
-            handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
-    return recoverLeaseResponse.getResponse();
+        handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
+    ArrayList<OmKeyInfo> list = new ArrayList();
+    list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()));
+    list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo()));
+    return list;
   }
 
   @Override
diff --git 
a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot 
b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
index a721f2acbb..1f9646579d 100644
--- 
a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
+++ 
b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
@@ -36,12 +36,12 @@ Create volume bucket and put key
 *** Test Cases ***
 Test ozone debug recover for o3fs
     ${result} =              Execute Lease recovery cli    
o3fs://${BUCKET}.${VOLUME}.om/${TESTFILE}
-    Should Contain    ${result}   Lease recovery SUCCEEDED
+    Should Contain    ${result}  Key: ${TESTFILE} is already closed
     ${result} =              Execute Lease recovery cli    
o3fs://${BUCKET}.${VOLUME}.om/randomfile
     Should Contain    ${result}    not found
 
 Test ozone debug recover for ofs
     ${result} =              Execute Lease recovery cli    
ofs://om/${VOLUME}/${BUCKET}/${TESTFILE}
-    Should Contain    ${result}   Lease recovery SUCCEEDED
+    Should Contain    ${result}  Key: ${TESTFILE} is already closed
     ${result} =              Execute Lease recovery cli    
ofs://om/${VOLUME}/${BUCKET}/randomfile
     Should Contain    ${result}    not found
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
index 2b8a8cb02b..1cd8a7e2b4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
@@ -149,8 +149,7 @@ public class TestLeaseRecovery {
         Thread.sleep(1000);
       }
       // The lease should have been recovered.
-      assertTrue("File should be closed", fs.recoverLease(file));
-      assertTrue(fs.isFileClosed(file));
+      assertTrue("File should be closed", fs.isFileClosed(file));
     } finally {
       closeIgnoringKeyNotFound(stream);
     }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 54cafbc0ad..737bf166c3 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -524,6 +524,8 @@ enum Status {
     S3_SECRET_ALREADY_EXISTS = 92;
     
     INVALID_PATH = 93;
+    KEY_UNDER_LEASE_RECOVERY = 94;
+    KEY_ALREADY_CLOSED = 95;
 }
 
 /**
@@ -1413,6 +1415,7 @@ message CommitKeyRequest {
     required KeyArgs keyArgs = 1;
     required uint64 clientID = 2;
     optional bool hsync = 3;
+    optional bool recovery = 4;
 }
 
 message CommitKeyResponse {
@@ -2070,7 +2073,8 @@ message RecoverLeaseRequest {
 }
 
 message RecoverLeaseResponse {
-  optional bool response = 1;
+  optional KeyInfo keyInfo = 1;
+  optional KeyInfo openKeyInfo = 2;
 }
 
 message SetTimesRequest {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 5fb3872d88..3c6085828f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4603,9 +4603,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public boolean recoverLease(String volumeName, String bucketName,
-                              String keyName) {
-    return false;
+  public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, 
String keyName) {
+    return null;
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
index 4855d7f7ab..7b920f9949 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.om.request.file;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -34,14 +36,11 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.response.file.OMRecoverLeaseResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-        .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-        .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-        .RecoverLeaseRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-        .RecoverLeaseResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease;
 
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -58,10 +57,9 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-        .Type.RecoverLease;
 
 /**
  * Perform actions for RecoverLease requests.
@@ -75,6 +73,8 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
   private String keyName;
   private OmKeyInfo keyInfo;
   private String dbFileKey;
+  private OmKeyInfo openKeyInfo;
+  private String dbOpenFileKey;
 
   private OMMetadataManager omMetadataManager;
 
@@ -141,28 +141,21 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
       acquiredLock = getOmLockDetails().isLockAcquired();
       validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
 
-      String openKeyEntryName = doWork(ozoneManager, transactionLogIndex);
+      RecoverLeaseResponse recoverLeaseResponse = doWork(ozoneManager, 
transactionLogIndex);
 
       // Prepare response
       boolean responseCode = true;
-      omResponse
-          .setRecoverLeaseResponse(
-              RecoverLeaseResponse.newBuilder()
-                  .setResponse(responseCode)
-                  .build())
-          .setCmdType(RecoverLease);
-      omClientResponse =
-          new OMRecoverLeaseResponse(omResponse.build(), getBucketLayout(),
-              keyInfo, dbFileKey, openKeyEntryName);
+      
omResponse.setRecoverLeaseResponse(recoverLeaseResponse).setCmdType(RecoverLease);
+      omClientResponse = new OMRecoverLeaseResponse(omResponse.build(), 
getBucketLayout(),
+          dbOpenFileKey, openKeyInfo);
       omMetrics.incNumRecoverLease();
-      LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}", volumeName,
-          bucketName, keyName);
+      LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}",
+          volumeName, bucketName, keyName);
     } catch (IOException | InvalidPathException ex) {
       LOG.error("Fail for recovering lease. Volume:{}, Bucket:{}, Key:{}",
           volumeName, bucketName, keyName, ex);
       exception = ex;
       omMetrics.incNumRecoverLeaseFails();
-      omResponse.setCmdType(RecoverLease);
       omClientResponse = new OMRecoverLeaseResponse(
           createErrorOMResponse(omResponse, exception), getBucketLayout());
     } finally {
@@ -186,60 +179,56 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
     return omClientResponse;
   }
 
-  private String doWork(OzoneManager ozoneManager, long transactionLogIndex)
-      throws IOException {
-
+  private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
+      long transactionLogIndex) throws IOException {
     final long volumeId = omMetadataManager.getVolumeId(volumeName);
-    final long bucketId = omMetadataManager.getBucketId(
-        volumeName, bucketName);
+    final long bucketId = omMetadataManager.getBucketId(volumeName, 
bucketName);
     Iterator<Path> pathComponents = Paths.get(keyName).iterator();
     long parentID = OMFileRequest.getParentID(volumeId, bucketId,
         pathComponents, keyName, omMetadataManager,
         "Cannot recover file : " + keyName
             + " as parent directory doesn't exist");
     String fileName = OzoneFSUtils.getFileName(keyName);
-    dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
-        parentID, fileName);
+    dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId, 
parentID, fileName);
 
     keyInfo = getKey(dbFileKey);
     if (keyInfo == null) {
-      throw new OMException("Key:" + keyName + " not found", KEY_NOT_FOUND);
+      throw new OMException("Key:" + keyName + " not found in keyTable", 
KEY_NOT_FOUND);
     }
-    final String clientId = keyInfo.getMetadata().remove(
-        OzoneConsts.HSYNC_CLIENT_ID);
-    if (clientId == null) {
+
+    final String writerId = 
keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+    if (writerId == null) {
       // if file is closed, do nothing and return right away.
-      LOG.warn("Key:" + keyName + " is already closed");
-      return null;
+      throw new OMException("Key: " + keyName + " is already closed", 
KEY_ALREADY_CLOSED);
     }
-    String openFileDBKey = omMetadataManager.getOpenFileName(
-            volumeId, bucketId, parentID, fileName, Long.parseLong(clientId));
-    if (openFileDBKey != null) {
-      commitKey(dbFileKey, keyInfo, fileName, ozoneManager,
-          transactionLogIndex);
-      removeOpenKey(openFileDBKey, fileName, transactionLogIndex);
+
+    dbOpenFileKey = omMetadataManager.getOpenFileName(
+        volumeId, bucketId, parentID, fileName, Long.parseLong(writerId));
+    openKeyInfo = 
omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenFileKey);
+    if (openKeyInfo == null) {
+      throw new OMException("Open Key " + dbOpenFileKey + " not found in 
openKeyTable", KEY_NOT_FOUND);
+    }
+
+    if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
+      LOG.debug("Key: " + keyName + " is already under recovery");
+    } else {
+      openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true");
+      openKeyInfo.setUpdateID(transactionLogIndex, 
ozoneManager.isRatisEnabled());
+      openKeyInfo.setModificationTime(Time.now());
+      // Add to cache.
+      omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
+          new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, 
openKeyInfo));
     }
+    keyInfo.setKeyName(keyName);
+    openKeyInfo.setKeyName(keyName);
+    RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder()
+        
.setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), 
true))
+        .setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), 
true));
 
-    return openFileDBKey;
+    return rb.build();
   }
 
   private OmKeyInfo getKey(String dbOzoneKey) throws IOException {
     return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
   }
-
-  private void commitKey(String dbOzoneKey, OmKeyInfo omKeyInfo,
-      String fileName, OzoneManager ozoneManager,
-      long transactionLogIndex) throws IOException {
-    omKeyInfo.setModificationTime(Time.now());
-    omKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled());
-
-    OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbOzoneKey,
-        omKeyInfo, fileName, transactionLogIndex);
-  }
-
-  private void removeOpenKey(String openKeyName, String fileName,
-      long transactionLogIndex) {
-    OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
-        openKeyName, null, fileName, transactionLogIndex);
-  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index 254aafcb50..ababd28c0a 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
@@ -138,7 +139,7 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
     newAllocatedBlockRequest.setKeyLocation(
         omKeyLocationInfoList.get(0).getProtobuf(getOmRequest().getVersion()));
 
-    return getOmRequest().toBuilder().setUserInfo(getUserInfo())
+    return 
getOmRequest().toBuilder().setUserInfo(getUserIfNotExists(ozoneManager))
         .setAllocateBlockRequest(newAllocatedBlockRequest).build();
 
   }
@@ -206,6 +207,10 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
             KEY_NOT_FOUND);
       }
 
+      if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
+        throw new OMException("Open Key " + openKeyName + " is under lease 
recovery",
+            KEY_UNDER_LEASE_RECOVERY);
+      }
       List<OmKeyLocationInfo> newLocationList = Collections.singletonList(
           OmKeyLocationInfo.getFromProtobuf(blockLocation));
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
index eb5a9ccb53..9fd8fe0ffa 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
@@ -136,7 +137,10 @@ public class OMAllocateBlockRequestWithFSO extends 
OMAllocateBlockRequest {
         throw new OMException("Open Key not found " + openKeyName,
                 KEY_NOT_FOUND);
       }
-
+      if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
+        throw new OMException("Open Key " + openKeyName + " is under lease 
recovery",
+            KEY_UNDER_LEASE_RECOVERY);
+      }
       List<OmKeyLocationInfo> newLocationList = Collections.singletonList(
               OmKeyLocationInfo.getFromProtobuf(blockLocation));
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index ebcb1a040a..56d7b72e62 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -68,7 +68,9 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.util.Time;
 
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
@@ -102,12 +104,15 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       OmUtils.validateKeyName(StringUtils.removeEnd(keyArgs.getKeyName(),
               OzoneConsts.FS_FILE_COPYING_TEMP_SUFFIX));
     }
-    boolean isHsync = commitKeyRequest.hasHsync() &&
-        commitKeyRequest.getHsync();
+    boolean isHsync = commitKeyRequest.hasHsync() && 
commitKeyRequest.getHsync();
+    boolean isRecovery = commitKeyRequest.hasRecovery() && 
commitKeyRequest.getRecovery();
     boolean enableHsync = ozoneManager.getConfiguration().getBoolean(
         OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
         OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED_DEFAULT);
-    if (isHsync && !enableHsync) {
+
+    // If hsynced is called for a file, then this file is hsynced, otherwise 
it's not hsynced.
+    // Currently, file lease recovery by design only supports recover hsynced 
file
+    if ((isHsync || isRecovery) && !enableHsync) {
       throw new OMException("Hsync is not enabled. To enable, " +
           "set ozone.fs.hsync.enabled = true", NOT_SUPPORTED_OPERATION);
     }
@@ -156,17 +161,20 @@ public class OMKeyCommitRequest extends OMKeyRequest {
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
-    boolean isHSync = commitKeyRequest.hasHsync() &&
-            commitKeyRequest.getHsync();
-
+    boolean isHSync = commitKeyRequest.hasHsync() && 
commitKeyRequest.getHsync();
+    boolean isRecovery = commitKeyRequest.hasRecovery() && 
commitKeyRequest.getRecovery();
+    // isHsync = true, a commit request as a result of client side hsync call
+    // isRecovery = true, a commit request as a result of client side 
recoverLease call
+    // none of isHsync and isRecovery is true, a commit request as a result of 
client side normal
+    // outputStream#close call.
     if (isHSync) {
       omMetrics.incNumKeyHSyncs();
     } else {
       omMetrics.incNumKeyCommits();
     }
 
-    LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
-        isHSync, volumeName, bucketName, keyName);
+    LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName = 
{}, keyName = {}",
+        isHSync, isRecovery, volumeName, bucketName, keyName);
 
     try {
       commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
@@ -179,10 +187,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
           commitKeyRequest.getClientID());
 
       String dbOzoneKey =
-          omMetadataManager.getOzoneKey(volumeName, bucketName,
-              keyName);
-      String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
-          keyName, commitKeyRequest.getClientID());
+          omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
 
       List<OmKeyLocationInfo>
           locationInfoList = getOmKeyLocationInfos(ozoneManager, 
commitKeyArgs);
@@ -225,9 +230,18 @@ public class OMKeyCommitRequest extends OMKeyRequest {
       Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
       OmKeyInfo keyToDelete =
           omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
+      long writerClientId = commitKeyRequest.getClientID();
+      if (isRecovery && keyToDelete != null) {
+        String clientId = 
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+        if (clientId == null) {
+          throw new OMException("Failed to recovery key, as " +
+              dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED);
+        }
+        writerClientId = Long.parseLong(clientId);
+      }
+
+      final String clientIdString = String.valueOf(writerClientId);
       if (null != keyToDelete) {
-        final String clientIdString
-            = String.valueOf(commitKeyRequest.getClientID());
         isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete)
             .map(WithMetadata::getMetadata)
             .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
@@ -235,21 +249,30 @@ public class OMKeyCommitRequest extends OMKeyRequest {
             .isPresent();
       }
 
+      String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+          keyName, writerClientId);
       omKeyInfo =
           omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey);
       if (omKeyInfo == null) {
-        String action = "commit";
-        if (isHSync) {
-          action = "hsync";
-        }
+        String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
         throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
             "entry is not found in the OpenKey table", KEY_NOT_FOUND);
       }
+      if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
+          omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) {
+        if (!isRecovery) {
+          throw new OMException("Cannot commit key " + dbOpenKey + " with " + 
OzoneConsts.LEASE_RECOVERY +
+              " metadata while recovery flag is not set in request", 
KEY_UNDER_LEASE_RECOVERY);
+        }
+      }
       omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           commitKeyArgs.getMetadataList()));
+
       if (isHSync) {
-        omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
-            String.valueOf(commitKeyRequest.getClientID()));
+        omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, 
clientIdString);
+      } else if (isRecovery) {
+        omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
+        omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
       }
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
@@ -314,7 +337,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
 
       // Add to cache of open key table and key table.
       if (!isHSync) {
-        // If isHSync = false, put a tombstone in OpenKeyTable cache,
+        // If !isHSync = true, put a tombstone in OpenKeyTable cache,
         // indicating the key is removed from OpenKeyTable.
         // So that this key can't be committed again.
         omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
@@ -350,12 +373,12 @@ public class OMKeyCommitRequest extends OMKeyRequest {
     }
 
     // Debug logging for any key commit operation, successful or not
-    LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}",
-        result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo);
+    LOG.debug("Key commit {} with isHSync = {}, isRecovery = {}, omKeyInfo = 
{}",
+        result == Result.SUCCESS ? "succeeded" : "failed", isHSync, 
isRecovery, omKeyInfo);
 
     if (!isHSync) {
       auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
-              exception, getOmRequest().getUserInfo()));
+          exception, getOmRequest().getUserInfo()));
       processResult(commitKeyRequest, volumeName, bucketName, keyName,
           omMetrics, exception, omKeyInfo, result);
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index a4b0b9fa0b..5ffa7b87fb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -57,7 +57,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
 import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
 
 /**
@@ -102,17 +104,20 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
     OMClientResponse omClientResponse = null;
     boolean bucketLockAcquired = false;
     Result result;
-    boolean isHSync = commitKeyRequest.hasHsync() &&
-        commitKeyRequest.getHsync();
-
+    boolean isHSync = commitKeyRequest.hasHsync() && 
commitKeyRequest.getHsync();
+    boolean isRecovery = commitKeyRequest.hasRecovery() && 
commitKeyRequest.getRecovery();
+    // isHsync = true, a commit request as a result of client side hsync call
+    // isRecovery = true, a commit request as a result of client side 
recoverLease call
+    // none of isHsync and isRecovery is true, a commit request as a result of 
client side normal
+    // outputStream#close call.
     if (isHSync) {
       omMetrics.incNumKeyHSyncs();
     } else {
       omMetrics.incNumKeyCommits();
     }
 
-    LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
-        isHSync, volumeName, bucketName, keyName);
+    LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName = 
{}, keyName = {}",
+        isHSync, isRecovery, volumeName, bucketName, keyName);
 
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
@@ -126,7 +131,6 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
               keyName, IAccessAuthorizer.ACLType.WRITE,
               commitKeyRequest.getClientID());
 
-
       Iterator<Path> pathComponents = Paths.get(keyName).iterator();
       String dbOpenFileKey = null;
 
@@ -150,29 +154,46 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
               + " as parent directory doesn't exist");
       String dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
               parentID, fileName);
-      dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
-              parentID, fileName, commitKeyRequest.getClientID());
+      OmKeyInfo keyToDelete =
+          omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
+      long writerClientId = commitKeyRequest.getClientID();
+      if (isRecovery && keyToDelete != null) {
+        String clientId = 
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+        if (clientId == null) {
+          throw new OMException("Failed to recovery key, as " +
+              dbFileKey + " is already closed", KEY_ALREADY_CLOSED);
+        }
+        writerClientId = Long.parseLong(clientId);
+      }
 
+      dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+          parentID, fileName, writerClientId);
       omKeyInfo = OMFileRequest.getOmKeyInfoFromFileTable(true,
               omMetadataManager, dbOpenFileKey, keyName);
       if (omKeyInfo == null) {
-        String action = "commit";
-        if (isHSync) {
-          action = "hsync";
-        }
+        String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
         throw new OMException("Failed to " + action + " key, as " +
-                dbOpenFileKey + "entry is not found in the OpenKey table",
-                KEY_NOT_FOUND);
+            dbOpenFileKey + " entry is not found in the OpenKey table", 
KEY_NOT_FOUND);
+      }
+
+      if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
+          omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) {
+        if (!isRecovery) {
+          throw new OMException("Cannot commit key " + dbOpenFileKey + " with 
" + OzoneConsts.LEASE_RECOVERY +
+              " metadata while recovery flag is not set in request", 
KEY_UNDER_LEASE_RECOVERY);
+        }
       }
+
       omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
           commitKeyArgs.getMetadataList()));
       if (isHSync) {
-        omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
-            String.valueOf(commitKeyRequest.getClientID()));
+        omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, 
String.valueOf(writerClientId));
+      } else if (isRecovery) {
+        omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
+        omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
       }
 
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
-
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
 
       List<OmKeyLocationInfo> uncommitted =
@@ -187,11 +208,8 @@ public class OMKeyCommitRequestWithFSO extends 
OMKeyCommitRequest {
       // creation after the knob turned on.
       boolean isPreviousCommitHsync = false;
       Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
-      OmKeyInfo keyToDelete =
-          omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
       if (null != keyToDelete) {
-        final String clientIdString
-            = String.valueOf(commitKeyRequest.getClientID());
+        final String clientIdString = String.valueOf(writerClientId);
         isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete)
             .map(WithMetadata::getMetadata)
             .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
index ef73ab5c82..fcefa473ff 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-        .OMResponse;
+    .OMResponse;
 
 import javax.annotation.Nonnull;
 import java.io.IOException;
@@ -39,16 +39,14 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
 @CleanupTableInfo(cleanupTables = {FILE_TABLE, OPEN_FILE_TABLE})
 public class OMRecoverLeaseResponse extends OmKeyResponse {
 
-  private OmKeyInfo keyInfo;
-  private String dbFileKey;
+  private OmKeyInfo openKeyInfo;
   private String openKeyName;
+
   public OMRecoverLeaseResponse(@Nonnull OMResponse omResponse,
-      BucketLayout bucketLayout, OmKeyInfo keyInfo, String dbFileKey,
-      String openKeyName) {
+      BucketLayout bucketLayout, String openKeyName, OmKeyInfo openKeyInfo) {
     super(omResponse, bucketLayout);
-    this.keyInfo = keyInfo;
-    this.dbFileKey = dbFileKey;
     this.openKeyName = openKeyName;
+    this.openKeyInfo = openKeyInfo;
   }
 
   /**
@@ -64,12 +62,11 @@ public class OMRecoverLeaseResponse extends OmKeyResponse {
   @Override
   protected void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
-    // Delete from OpenKey table
+    // Update OpenKey table
     if (openKeyName != null) {
-      omMetadataManager.getOpenKeyTable(getBucketLayout()).deleteWithBatch(
-          batchOperation, openKeyName);
-      omMetadataManager.getKeyTable(getBucketLayout())
-          .putWithBatch(batchOperation, dbFileKey, keyInfo);
+      // In INIT stage, update the keyInfo in openKeyTable
+      omMetadataManager.getOpenKeyTable(getBucketLayout()).putWithBatch(
+          batchOperation, openKeyName, openKeyInfo);
     }
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index bc41b6aa0c..c4f90958c7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -84,7 +84,7 @@ public class OMKeyCommitResponse extends OmKeyResponse {
     // Delete from OpenKey table
     if (!isHSync()) {
       omMetadataManager.getOpenKeyTable(getBucketLayout())
-              .deleteWithBatch(batchOperation, openKeyName);
+          .deleteWithBatch(batchOperation, openKeyName);
     }
 
     omMetadataManager.getKeyTable(getBucketLayout())
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
index ff267f7c07..6073632e55 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
@@ -77,18 +77,18 @@ public class OMKeyCommitResponseWithFSO extends 
OMKeyCommitResponse {
     // Delete from OpenKey table if commit
     if (!this.isHSync()) {
       omMetadataManager.getOpenKeyTable(getBucketLayout())
-              .deleteWithBatch(batchOperation, getOpenKeyName());
+          .deleteWithBatch(batchOperation, getOpenKeyName());
     }
 
     OMFileRequest.addToFileTable(omMetadataManager, batchOperation,
-            getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID());
+        getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID());
 
     updateDeletedTable(omMetadataManager, batchOperation);
 
     // update bucket usedBytes.
     omMetadataManager.getBucketTable().putWithBatch(batchOperation,
-            omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(),
-                    getOmBucketInfo().getBucketName()), getOmBucketInfo());
+        omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(),
+            getOmBucketInfo().getBucketName()), getOmBucketInfo());
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 45209258f7..f52af51919 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -1571,7 +1571,7 @@ public final class OMRequestTestUtils {
               OMRequestTestUtils.createOmDirectoryInfo(pathElement, ++objectId,
                       parentId);
       OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo,
-              volumeName, bucketName, txnID, omMetaMgr);
+              volumeName, bucketName, ++txnID, omMetaMgr);
       parentId = omDirInfo.getObjectID();
     }
     return parentId;
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
index 5c10c0821d..476e2b2f93 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
@@ -18,39 +18,45 @@
 
 package org.apache.hadoop.ozone.om.request.file;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
 import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .KeyLocation;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .RecoverLeaseRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
 import org.apache.hadoop.util.Time;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB.setReplicationConfig;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests OMRecoverLeaseRequest.
@@ -71,34 +77,190 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
    */
   @Test
   public void testRecoverHsyncFile() throws Exception {
-    when(ozoneManager.getAclsEnabled()).thenReturn(true);
-    when(ozoneManager.getVolumeOwner(
-        anyString(),
-        any(IAccessAuthorizer.ACLType.class), any(
-        OzoneObj.ResourceType.class)))
-        .thenReturn("user");
-    InetSocketAddress address = new InetSocketAddress("localhost", 10000);
-    when(ozoneManager.getOmRpcServerAddr()).thenReturn(address);
-    populateNamespace(true, true);
+    populateNamespace(true, true, true, true);
 
     OMClientResponse omClientResponse = validateAndUpdateCache();
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    RecoverLeaseResponse recoverLeaseResponse = 
omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo);
+    OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
 
-    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    omClientResponse = 
validateAndUpdateCacheForCommit(getNewKeyArgs(omKeyInfo, 0));
+    omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+
+    verifyTables(true, false);
+  }
+
+  /**
+   * Verify that RecoverLease request is idempotent.
+   * @throws Exception
+   */
+  @Test
+  public void testInitStageIdempotent() throws Exception {
+    populateNamespace(true, true, true, true);
+
+    // call recovery first time
+    OMClientResponse omClientResponse = validateAndUpdateCache();
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    RecoverLeaseResponse recoverLeaseResponse = 
omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo1 = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo1);
+
+    // call recovery second time
+    omClientResponse = validateAndUpdateCache();
+    omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    recoverLeaseResponse = omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo2 = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo2);
+    Assertions.assertEquals(keyInfo1.getKeyName(), keyInfo2.getKeyName());
+  }
+
+  /**
+   * Verify that COMMIT request for recovery is not idempotent.
+   * @throws Exception
+   */
+  @Test
+  public void testCommitStageNotIdempotent() throws Exception {
+    populateNamespace(true, true, true, true);
+
+    // call recovery
+    OMClientResponse omClientResponse = validateAndUpdateCache();
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    RecoverLeaseResponse recoverLeaseResponse = 
omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo);
+    OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+    KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0);
+
+    // call commit first time
+    omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs);
+    omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+
+    // call commit second time
+    omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs);
+    omResponse = omClientResponse.getOMResponse();
+    
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_ALREADY_CLOSED, 
omResponse.getStatus());
+  }
+
+  /**
+   * Verify that RecoverLease COMMIT request has a new file length.
+   * @throws Exception
+   */
+  @Test
+  public void testRecoverWithNewFileLength() throws Exception {
+    populateNamespace(true, true, true, true);
+
+    // call recovery
+    OMClientResponse omClientResponse = validateAndUpdateCache();
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    RecoverLeaseResponse recoverLeaseResponse = 
omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo);
+    OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+    // call commit
+    long deltaLength = 100;
+    KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, deltaLength);
+    omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs);
+    omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+
+    // get file length and check the length is as expected
+    String ozoneKey = getFileName();
+    OmKeyInfo omKeyInfoFetched = 
omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);
+    Assertions.assertEquals(omKeyInfo.getDataSize(), 
omKeyInfoFetched.getDataSize());
+
+    // check the final block length is as expected
+    List<OmKeyLocationInfo> locationInfoListFetched =
+        
omKeyInfoFetched.getLatestVersionLocations().getBlocksLatestVersionOnly();
+    List<OmKeyLocationInfo> omKeyLocationInfos = 
omKeyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
+    Assertions.assertEquals(omKeyLocationInfos.get(omKeyLocationInfos.size() - 
1).getLength(),
+        locationInfoListFetched.get(locationInfoListFetched.size() - 
1).getLength());
+
+    // check the committed file doesn't have HSYNC_CLIENT_ID and 
LEASE_RECOVERY metadata
+    
Assertions.assertNull(omKeyInfoFetched.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID));
+    
Assertions.assertNull(omKeyInfoFetched.getMetadata().get(OzoneConsts.LEASE_RECOVERY));
+  }
+
+  /**
+   * Verify that RecoverLease COMMIT request has a new client ID.
+   * @throws Exception
+   */
+  @Test
+  public void testRecoverWithNewClientID() throws Exception {
+    populateNamespace(true, true, true, true);
+
+    // call recovery
+    OMClientResponse omClientResponse = validateAndUpdateCache();
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    RecoverLeaseResponse recoverLeaseResponse = 
omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo);
+    OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+    // call commit
+    KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0);
+    omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs, true, true);
+    omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+  }
+
+  /**
+   * Verify that an under recovery file will reject allocate block and further 
hsync call(commit).
+   * @throws Exception
+   */
+  @Test
+  public void testRejectAllocateBlockAndHsync() throws Exception {
+    populateNamespace(true, true, true, true);
+
+    // call recovery
+    OMClientResponse omClientResponse = validateAndUpdateCache();
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK, 
omResponse.getStatus());
+    RecoverLeaseResponse recoverLeaseResponse = 
omResponse.getRecoverLeaseResponse();
+    KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+    Assertions.assertNotNull(keyInfo);
+    OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+    // call allocate block
+    OMRequest request = createAllocateBlockRequest(volumeName, bucketName, 
keyName);
+    OMAllocateBlockRequestWithFSO omAllocateBlockRequest =
+        new OMAllocateBlockRequestWithFSO(request, getBucketLayout());
+    request = omAllocateBlockRequest.preExecute(ozoneManager);
+    assertNotNull(request.getUserInfo());
+    omAllocateBlockRequest = new OMAllocateBlockRequestWithFSO(request, 
getBucketLayout());
+    omClientResponse = omAllocateBlockRequest.validateAndUpdateCache(
+        ozoneManager, 100L, ozoneManagerDoubleBufferHelper);
+    
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_UNDER_LEASE_RECOVERY,
         omClientResponse.getOMResponse().getStatus());
 
-    verifyTables(true, true);
+    // call commit(hsync calls commit)
+    KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0);
+    omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs, false, 
false);
+    
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_UNDER_LEASE_RECOVERY,
+        omClientResponse.getOMResponse().getStatus());
   }
 
   /**
-   * verify that recover a closed file should be allowed (essentially no-op).
-    */
+   * verify that recover a closed file.
+   **/
   @Test
   public void testRecoverClosedFile() throws Exception {
-    populateNamespace(true, false);
+    populateNamespace(true, false, false, false);
 
     OMClientResponse omClientResponse = validateAndUpdateCache();
 
-    Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+    
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_ALREADY_CLOSED,
         omClientResponse.getOMResponse().getStatus());
 
     verifyTables(true, false);
@@ -109,7 +271,7 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
     */
   @Test
   public void testRecoverOpenFile() throws Exception {
-    populateNamespace(false, true);
+    populateNamespace(false, false, true, false);
 
     OMClientResponse omClientResponse = validateAndUpdateCache();
 
@@ -125,7 +287,7 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
    */
   @Test
   public void testRecoverAbsentFile() throws Exception {
-    populateNamespace(false, false);
+    populateNamespace(false, false, false, false);
 
     OMClientResponse omClientResponse = validateAndUpdateCache();
 
@@ -135,8 +297,37 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
     verifyTables(false, false);
   }
 
-  private void populateNamespace(boolean addKeyTable, boolean addOpenKeyTable)
-      throws Exception {
+  private KeyArgs getNewKeyArgs(OmKeyInfo omKeyInfo, long deltaLength) throws 
IOException {
+    OmKeyLocationInfoGroup omKeyLocationInfoGroup = 
omKeyInfo.getLatestVersionLocations();
+    List<OmKeyLocationInfo> omKeyLocationInfoList = 
omKeyLocationInfoGroup.getBlocksLatestVersionOnly();
+    long lastBlockLength = 
omKeyLocationInfoList.get(omKeyLocationInfoList.size() - 1).getLength();
+    omKeyLocationInfoList.get(omKeyLocationInfoList.size() - 
1).setLength(lastBlockLength + deltaLength);
+
+    long fileLength = 
omKeyLocationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
+    omKeyInfo.setDataSize(fileLength);
+    OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(omKeyInfo.getVolumeName())
+        
.setBucketName(omKeyInfo.getBucketName()).setKeyName(omKeyInfo.getKeyName())
+        
.setReplicationConfig(omKeyInfo.getReplicationConfig()).setDataSize(fileLength)
+        
.setLocationInfoList(omKeyLocationInfoList).setLatestVersionLocation(true)
+        .build();
+
+    List<OmKeyLocationInfo> locationInfoList = keyArgs.getLocationInfoList();
+    Preconditions.checkNotNull(locationInfoList);
+    KeyArgs.Builder keyArgsBuilder = KeyArgs.newBuilder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .setDataSize(keyArgs.getDataSize())
+        .addAllMetadata(KeyValueUtil.toProtobuf(keyArgs.getMetadata()))
+        .addAllKeyLocations(locationInfoList.stream()
+            .map(info -> info.getProtobuf(ClientVersion.CURRENT_VERSION))
+            .collect(Collectors.toList()));
+    setReplicationConfig(keyArgs.getReplicationConfig(), keyArgsBuilder);
+    return keyArgsBuilder.build();
+  }
+
+  private void populateNamespace(boolean addKeyTable, boolean 
keyInfoWithHsyncFlag,
+      boolean addOpenKeyTable, boolean openKeyInfoWithHsyncFlag) throws 
Exception {
     String parentDir = "c/d/e";
     String fileName = "f";
     keyName = parentDir + "/" + fileName;
@@ -151,14 +342,14 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
 
     OmKeyInfo omKeyInfo;
     if (addKeyTable) {
-      String ozoneKey = addToFileTable(allocatedLocationList);
+      String ozoneKey = addToFileTable(allocatedLocationList, 
keyInfoWithHsyncFlag);
       omKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
           .get(ozoneKey);
       assertNotNull(omKeyInfo);
     }
 
     if (addOpenKeyTable) {
-      String openKey = addToOpenFileTable(allocatedLocationList);
+      String openKey = addToOpenFileTable(allocatedLocationList, 
openKeyInfoWithHsyncFlag);
 
       omKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout())
           .get(openKey);
@@ -166,22 +357,33 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
     }
   }
 
+  protected OMRequest createAllocateBlockRequest(String volumeName, String 
bucketName, String keyName) {
+    KeyArgs keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(volumeName).setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setFactor(replicationFactor).setType(replicationType)
+        .build();
+
+    AllocateBlockRequest allocateBlockRequest =
+        
AllocateBlockRequest.newBuilder().setClientID(clientID).setKeyArgs(keyArgs).build();
+
+    return OMRequest.newBuilder()
+          .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
+          .setClientId(UUID.randomUUID().toString())
+          .setAllocateBlockRequest(allocateBlockRequest).build();
+  }
+
   @NotNull
   protected OMRequest createRecoverLeaseRequest(
       String volumeName, String bucketName, String keyName) {
-
-    RecoverLeaseRequest recoverLeaseRequest = RecoverLeaseRequest.newBuilder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName).build();
-
+    RecoverLeaseRequest.Builder rb = RecoverLeaseRequest.newBuilder();
+    rb.setVolumeName(volumeName).setBucketName(bucketName).setKeyName(keyName);
     return OMRequest.newBuilder()
         .setCmdType(OzoneManagerProtocolProtos.Type.RecoverLease)
         .setClientId(UUID.randomUUID().toString())
-        .setRecoverLeaseRequest(recoverLeaseRequest).build();
+        .setRecoverLeaseRequest(rb.build()).build();
   }
 
-
   private OMClientResponse validateAndUpdateCache() throws Exception {
     OMRequest modifiedOmRequest = doPreExecute(createRecoverLeaseRequest(
         volumeName, bucketName, keyName));
@@ -196,6 +398,34 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
     return omClientResponse;
   }
 
+  @NotNull
+  protected OMRequest createKeyCommitRequest(KeyArgs keyArgs, boolean 
newClientID, boolean recovery) {
+    CommitKeyRequest.Builder rb =
+        
CommitKeyRequest.newBuilder().setKeyArgs(keyArgs).setRecovery(recovery);
+    rb.setClientID(newClientID ? clientID + 1 : clientID);
+    return OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CommitKey)
+        .setClientId(UUID.randomUUID().toString())
+        .setCommitKeyRequest(rb.build()).build();
+  }
+
+  private OMClientResponse validateAndUpdateCacheForCommit(KeyArgs keyArgs) 
throws Exception {
+    return validateAndUpdateCacheForCommit(keyArgs, false, true);
+  }
+
+  private OMClientResponse validateAndUpdateCacheForCommit(KeyArgs keyArgs, 
boolean newClientID,
+      boolean recovery) throws Exception {
+    OMRequest omRequest = createKeyCommitRequest(keyArgs, newClientID, 
recovery);
+    OMKeyCommitRequestWithFSO omKeyCommitRequest = new 
OMKeyCommitRequestWithFSO(omRequest, getBucketLayout());
+    OMRequest modifiedOmRequest = omKeyCommitRequest.preExecute(ozoneManager);
+    assertNotNull(modifiedOmRequest.getUserInfo());
+
+    omKeyCommitRequest = new OMKeyCommitRequestWithFSO(modifiedOmRequest, 
getBucketLayout());
+    OMClientResponse omClientResponse =
+        omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L, 
ozoneManagerDoubleBufferHelper);
+    return omClientResponse;
+  }
+
   private void verifyTables(boolean hasKey, boolean hasOpenKey)
       throws IOException {
     // Now entry should be created in key Table.
@@ -238,8 +468,7 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
         fileName);
   }
 
-  protected OMRecoverLeaseRequest getOmRecoverLeaseRequest(
-      OMRequest omRequest) {
+  protected OMRecoverLeaseRequest getOmRecoverLeaseRequest(OMRequest 
omRequest) {
     return new OMRecoverLeaseRequest(omRequest);
   }
 
@@ -269,14 +498,16 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
     return modifiedOmRequest;
   }
 
-  String addToOpenFileTable(List<OmKeyLocationInfo> locationList)
+  String addToOpenFileTable(List<OmKeyLocationInfo> locationList, boolean 
hsyncFlag)
       throws Exception {
     OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
         bucketName, keyName, replicationType, replicationFactor, 0, parentId,
         0, Time.now(), version);
     omKeyInfo.appendNewBlocks(locationList, false);
-    omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
-        String.valueOf(clientID));
+    if (hsyncFlag) {
+      omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+          String.valueOf(clientID));
+    }
 
     OMRequestTestUtils.addFileToKeyTable(
         true, false, omKeyInfo.getFileName(),
@@ -291,12 +522,16 @@ public class TestOMRecoverLeaseRequest extends 
TestOMKeyRequest {
         omKeyInfo.getParentObjectID(), omKeyInfo.getFileName(), clientID);
   }
 
-  String addToFileTable(List<OmKeyLocationInfo> locationList)
+  String addToFileTable(List<OmKeyLocationInfo> locationList, boolean 
hsyncFlag)
       throws Exception {
     OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
         bucketName, keyName, replicationType, replicationFactor, 0, parentId,
         0, Time.now(), version);
     omKeyInfo.appendNewBlocks(locationList, false);
+    if (hsyncFlag) {
+      omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+          String.valueOf(clientID));
+    }
 
     OMRequestTestUtils.addFileToKeyTable(
         false, false, omKeyInfo.getFileName(),
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index a72756d010..40fccb8fda 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -161,6 +161,6 @@ public class TestOMKeyCommitResponse extends 
TestOMKeyResponse {
           new RepeatedOmKeyInfo(e)));
     }
     return new OMKeyCommitResponse(omResponse, omKeyInfo, ozoneKey, openKey,
-            omBucketInfo, deleteKeyMap, isHSync);
+        omBucketInfo, deleteKeyMap, isHSync);
   }
 }
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index a60134d4f6..e510d9841b 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -733,13 +734,20 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
   }
 
   @Override
-  public boolean recoverLease(final String pathStr) throws IOException {
-    incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1);
+  public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws 
IOException {
+    incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
 
     return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
         volume.getName(), bucket.getName(), pathStr);
   }
 
+  @Override
+  public void recoverFile(OmKeyArgs keyArgs) throws IOException {
+    incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
+
+    ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+  }
+
   @Override
   public void setTimes(String key, long mtime, long atime) throws IOException {
     incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 085a2300fd..567004bc81 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -1403,8 +1404,8 @@ public class BasicRootedOzoneClientAdapterImpl
   }
 
   @Override
-  public boolean recoverLease(final String pathStr) throws IOException {
-    incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1);
+  public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws 
IOException {
+    incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
     OFSPath ofsPath = new OFSPath(pathStr, config);
 
     OzoneVolume volume = objectStore.getVolume(ofsPath.getVolumeName());
@@ -1413,6 +1414,13 @@ public class BasicRootedOzoneClientAdapterImpl
             volume.getName(), bucket.getName(), ofsPath.getKeyName());
   }
 
+  @Override
+  public void recoverFile(OmKeyArgs keyArgs) throws IOException {
+    incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
+
+    ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+  }
+
   @Override
   public void setTimes(String key, long mtime, long atime) throws IOException {
     incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index c48f1a6366..dbce02dd56 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
@@ -95,7 +97,9 @@ public interface OzoneClientAdapter {
       String fromSnapshot, String toSnapshot)
       throws IOException, InterruptedException;
 
-  boolean recoverLease(String pathStr) throws IOException;
+  List<OmKeyInfo> recoverFilePrepare(String pathStr) throws IOException;
+
+  void recoverFile(OmKeyArgs keyArgs) throws IOException;
 
   void setTimes(String key, long mtime, long atime) throws IOException;
 
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
index aae71e9c4c..10abc57091 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
@@ -77,8 +77,9 @@ public enum Statistic {
       "Calls of setTimes()"),
   INVOCATION_IS_FILE_CLOSED("op_is_file_closed",
       "Calls of isFileClosed()"),
-  INVOCATION_RECOVER_LEASE("op_recover_lease",
-      "Calls of recoverLease()"),
+  INVOCATION_RECOVER_FILE_PREPARE("op_recover_file_prepare",
+      "Calls of recoverFilePrepare()"),
+  INVOCATION_RECOVER_FILE("op_recover_file", "Calls of recoverFile()"),
   INVOCATION_SET_SAFE_MODE("op_set_safe_mode",
       "Calls of setSafeMode()");
 
diff --git 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 203e7ed373..8f977167a2 100644
--- 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.ozone;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.List;
 
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 /**
@@ -129,7 +132,16 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
     LOG.trace("recoverLease() path:{}", f);
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    return getAdapter().recoverLease(key);
+    List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+    // TODO: query DN to get the final block length
+    OmKeyInfo keyInfo = infoList.get(0);
+    OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+        
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        .build();
+    getAdapter().recoverFile(keyArgs);
+    return true;
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 9b1596c05b..0530b606b2 100644
--- 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -29,11 +29,14 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.List;
 
 /**
  * The Rooted Ozone Filesystem (OFS) implementation.
@@ -131,17 +134,24 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
    */
   @Override
   public boolean recoverLease(final Path f) throws IOException {
-    incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1);
     statistics.incrementWriteOps(1);
     LOG.trace("recoverLease() path:{}", f);
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    return getAdapter().recoverLease(key);
+    List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+    // TODO: query DN to get the final block length
+    OmKeyInfo keyInfo = infoList.get(0);
+    OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+        
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        .build();
+    getAdapter().recoverFile(keyArgs);
+    return true;
   }
 
   @Override
   public boolean isFileClosed(Path f) throws IOException {
-    incrementCounter(Statistic.INVOCATION_IS_FILE_CLOSED, 1);
     statistics.incrementReadOps(1);
     LOG.trace("isFileClosed() path:{}", f);
     Path qualifiedPath = makeQualified(f);
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 71f01e4414..a31a07cf67 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.ozone;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.List;
 
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 /**
@@ -129,7 +132,16 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
     LOG.trace("isFileClosed() path:{}", f);
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    return getAdapter().recoverLease(key);
+    List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+    // TODO: query DN to get the final block length
+    OmKeyInfo keyInfo = infoList.get(0);
+    OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+        
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        .build();
+    getAdapter().recoverFile(keyArgs);
+    return true;
   }
 
   @Override
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 7561e20a87..ed62f608b4 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -29,11 +29,14 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.List;
 
 /**
  * The Rooted Ozone Filesystem (OFS) implementation.
@@ -128,7 +131,16 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
     LOG.trace("recoverLease() path:{}", f);
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    return getAdapter().recoverLease(key);
+    List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+    // TODO: query DN to get the final block length
+    OmKeyInfo keyInfo = infoList.get(0);
+    OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+        
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        .build();
+    getAdapter().recoverFile(keyArgs);
+    return true;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to