This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new c45449c17f HDDS-9638. [hsync] File recovery support in OM (#5847)
c45449c17f is described below
commit c45449c17f7622ba2380587a31568a068c2a3580
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Jan 9 10:10:34 2024 +0800
HDDS-9638. [hsync] File recovery support in OM (#5847)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../hadoop/ozone/om/exceptions/OMException.java | 4 +-
.../ozone/om/protocol/OzoneManagerProtocol.java | 19 +-
...OzoneManagerProtocolClientSideTranslatorPB.java | 38 ++-
.../debug/ozone-debug-lease-recovery.robot | 4 +-
.../apache/hadoop/fs/ozone/TestLeaseRecovery.java | 3 +-
.../src/main/proto/OmClientProtocol.proto | 6 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 5 +-
.../om/request/file/OMRecoverLeaseRequest.java | 107 +++----
.../om/request/key/OMAllocateBlockRequest.java | 7 +-
.../request/key/OMAllocateBlockRequestWithFSO.java | 6 +-
.../ozone/om/request/key/OMKeyCommitRequest.java | 71 +++--
.../om/request/key/OMKeyCommitRequestWithFSO.java | 60 ++--
.../om/response/file/OMRecoverLeaseResponse.java | 21 +-
.../ozone/om/response/key/OMKeyCommitResponse.java | 2 +-
.../response/key/OMKeyCommitResponseWithFSO.java | 8 +-
.../ozone/om/request/OMRequestTestUtils.java | 2 +-
.../om/request/file/TestOMRecoverLeaseRequest.java | 325 ++++++++++++++++++---
.../om/response/key/TestOMKeyCommitResponse.java | 2 +-
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 12 +-
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 12 +-
.../apache/hadoop/fs/ozone/OzoneClientAdapter.java | 6 +-
.../java/org/apache/hadoop/fs/ozone/Statistic.java | 5 +-
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 14 +-
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 16 +-
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 14 +-
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 14 +-
27 files changed, 573 insertions(+), 211 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 01fa7c8a52..25690ed105 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -412,6 +412,7 @@ public final class OzoneConsts {
/** Metadata stored in OmKeyInfo. */
public static final String HSYNC_CLIENT_ID = "hsyncClientId";
+ public static final String LEASE_RECOVERY = "leaseRecovery";
//GDPR
public static final String GDPR_FLAG = "gdprEnabled";
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 20b4fb1ed0..c5cf619e37 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -267,6 +267,8 @@ public class OMException extends IOException {
S3_SECRET_ALREADY_EXISTS,
- INVALID_PATH
+ INVALID_PATH,
+ KEY_UNDER_LEASE_RECOVERY,
+ KEY_ALREADY_CLOSED
}
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index ad394bf4d1..824346058d 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.UUID;
import javax.annotation.Nonnull;
+
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.OzoneAcl;
@@ -269,6 +270,19 @@ public interface OzoneManagerProtocol
"this to be implemented, as write requests use a new approach.");
}
+ /**
+ * Recovery and commit a key. This will make the change from the client
visible. The client
+ * is identified by the clientID.
+ *
+ * @param args the key to commit
+ * @param clientID the client identification
+ * @throws IOException
+ */
+ default void recoverKey(OmKeyArgs args, long clientID)
+ throws IOException {
+ throw new UnsupportedOperationException("OzoneManager does not require " +
+ "this to be implemented, as write requests use a new approach.");
+ }
/**
* Allocate a new block, it is assumed that the client is having an open key
@@ -1086,11 +1100,10 @@ public interface OzoneManagerProtocol
* @param volumeName - The volume name.
* @param bucketName - The bucket name.
* @param keyName - The key user want to recover.
- * @return true if the file is already closed
+ * @return OmKeyInfo KeyInfo is file under recovery
* @throws IOException if an error occurs
*/
- boolean recoverLease(String volumeName, String bucketName,
- String keyName) throws IOException;
+ List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String
keyName) throws IOException;
/**
* Update modification time and access time of a file.
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index a179ca5c40..f9572b3bdc 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -777,13 +777,19 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
@Override
public void hsyncKey(OmKeyArgs args, long clientId)
throws IOException {
- updateKey(args, clientId, true);
+ updateKey(args, clientId, true, false);
}
@Override
public void commitKey(OmKeyArgs args, long clientId)
throws IOException {
- updateKey(args, clientId, false);
+ updateKey(args, clientId, false, false);
+ }
+
+ @Override
+ public void recoverKey(OmKeyArgs args, long clientId)
+ throws IOException {
+ updateKey(args, clientId, false, true);
}
public static void setReplicationConfig(ReplicationConfig replication,
@@ -799,7 +805,7 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
b.setType(replication.getReplicationType());
}
- private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
+ private void updateKey(OmKeyArgs args, long clientId, boolean hsync, boolean
recovery)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
@@ -820,15 +826,13 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
req.setKeyArgs(keyArgsBuilder.build());
req.setClientID(clientId);
req.setHsync(hsync);
-
+ req.setRecovery(recovery);
OMRequest omRequest = createOMRequest(Type.CommitKey)
.setCommitKeyRequest(req)
.build();
handleError(submitRequest(omRequest));
-
-
}
@Override
@@ -2440,21 +2444,23 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
}
@Override
- public boolean recoverLease(String volumeName, String bucketName,
- String keyName) throws IOException {
+ public List<OmKeyInfo> recoverLease(String volumeName, String bucketName,
String keyName) throws IOException {
RecoverLeaseRequest recoverLeaseRequest =
- RecoverLeaseRequest.newBuilder()
- .setVolumeName(volumeName)
- .setBucketName(bucketName)
- .setKeyName(keyName)
- .build();
+ RecoverLeaseRequest.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .build();
OMRequest omRequest = createOMRequest(Type.RecoverLease)
- .setRecoverLeaseRequest(recoverLeaseRequest).build();
+ .setRecoverLeaseRequest(recoverLeaseRequest).build();
RecoverLeaseResponse recoverLeaseResponse =
- handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
- return recoverLeaseResponse.getResponse();
+ handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
+ ArrayList<OmKeyInfo> list = new ArrayList();
+ list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()));
+ list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo()));
+ return list;
}
@Override
diff --git
a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
index a721f2acbb..1f9646579d 100644
---
a/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
+++
b/hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-lease-recovery.robot
@@ -36,12 +36,12 @@ Create volume bucket and put key
*** Test Cases ***
Test ozone debug recover for o3fs
${result} = Execute Lease recovery cli
o3fs://${BUCKET}.${VOLUME}.om/${TESTFILE}
- Should Contain ${result} Lease recovery SUCCEEDED
+ Should Contain ${result} Key: ${TESTFILE} is already closed
${result} = Execute Lease recovery cli
o3fs://${BUCKET}.${VOLUME}.om/randomfile
Should Contain ${result} not found
Test ozone debug recover for ofs
${result} = Execute Lease recovery cli
ofs://om/${VOLUME}/${BUCKET}/${TESTFILE}
- Should Contain ${result} Lease recovery SUCCEEDED
+ Should Contain ${result} Key: ${TESTFILE} is already closed
${result} = Execute Lease recovery cli
ofs://om/${VOLUME}/${BUCKET}/randomfile
Should Contain ${result} not found
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
index 2b8a8cb02b..1cd8a7e2b4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
@@ -149,8 +149,7 @@ public class TestLeaseRecovery {
Thread.sleep(1000);
}
// The lease should have been recovered.
- assertTrue("File should be closed", fs.recoverLease(file));
- assertTrue(fs.isFileClosed(file));
+ assertTrue("File should be closed", fs.isFileClosed(file));
} finally {
closeIgnoringKeyNotFound(stream);
}
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 54cafbc0ad..737bf166c3 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -524,6 +524,8 @@ enum Status {
S3_SECRET_ALREADY_EXISTS = 92;
INVALID_PATH = 93;
+ KEY_UNDER_LEASE_RECOVERY = 94;
+ KEY_ALREADY_CLOSED = 95;
}
/**
@@ -1413,6 +1415,7 @@ message CommitKeyRequest {
required KeyArgs keyArgs = 1;
required uint64 clientID = 2;
optional bool hsync = 3;
+ optional bool recovery = 4;
}
message CommitKeyResponse {
@@ -2070,7 +2073,8 @@ message RecoverLeaseRequest {
}
message RecoverLeaseResponse {
- optional bool response = 1;
+ optional KeyInfo keyInfo = 1;
+ optional KeyInfo openKeyInfo = 2;
}
message SetTimesRequest {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 5fb3872d88..3c6085828f 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4603,9 +4603,8 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
@Override
- public boolean recoverLease(String volumeName, String bucketName,
- String keyName) {
- return false;
+ public List<OmKeyInfo> recoverLease(String volumeName, String bucketName,
String keyName) {
+ return null;
}
@Override
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
index 4855d7f7ab..7b920f9949 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.om.request.file;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -34,14 +36,11 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.response.file.OMRecoverLeaseResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .RecoverLeaseRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .RecoverLeaseResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
+import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -58,10 +57,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .Type.RecoverLease;
/**
* Perform actions for RecoverLease requests.
@@ -75,6 +73,8 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
private String keyName;
private OmKeyInfo keyInfo;
private String dbFileKey;
+ private OmKeyInfo openKeyInfo;
+ private String dbOpenFileKey;
private OMMetadataManager omMetadataManager;
@@ -141,28 +141,21 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
acquiredLock = getOmLockDetails().isLockAcquired();
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
- String openKeyEntryName = doWork(ozoneManager, transactionLogIndex);
+ RecoverLeaseResponse recoverLeaseResponse = doWork(ozoneManager,
transactionLogIndex);
// Prepare response
boolean responseCode = true;
- omResponse
- .setRecoverLeaseResponse(
- RecoverLeaseResponse.newBuilder()
- .setResponse(responseCode)
- .build())
- .setCmdType(RecoverLease);
- omClientResponse =
- new OMRecoverLeaseResponse(omResponse.build(), getBucketLayout(),
- keyInfo, dbFileKey, openKeyEntryName);
+
omResponse.setRecoverLeaseResponse(recoverLeaseResponse).setCmdType(RecoverLease);
+ omClientResponse = new OMRecoverLeaseResponse(omResponse.build(),
getBucketLayout(),
+ dbOpenFileKey, openKeyInfo);
omMetrics.incNumRecoverLease();
- LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}", volumeName,
- bucketName, keyName);
+ LOG.debug("Key recovered. Volume:{}, Bucket:{}, Key:{}",
+ volumeName, bucketName, keyName);
} catch (IOException | InvalidPathException ex) {
LOG.error("Fail for recovering lease. Volume:{}, Bucket:{}, Key:{}",
volumeName, bucketName, keyName, ex);
exception = ex;
omMetrics.incNumRecoverLeaseFails();
- omResponse.setCmdType(RecoverLease);
omClientResponse = new OMRecoverLeaseResponse(
createErrorOMResponse(omResponse, exception), getBucketLayout());
} finally {
@@ -186,60 +179,56 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
return omClientResponse;
}
- private String doWork(OzoneManager ozoneManager, long transactionLogIndex)
- throws IOException {
-
+ private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
+ long transactionLogIndex) throws IOException {
final long volumeId = omMetadataManager.getVolumeId(volumeName);
- final long bucketId = omMetadataManager.getBucketId(
- volumeName, bucketName);
+ final long bucketId = omMetadataManager.getBucketId(volumeName,
bucketName);
Iterator<Path> pathComponents = Paths.get(keyName).iterator();
long parentID = OMFileRequest.getParentID(volumeId, bucketId,
pathComponents, keyName, omMetadataManager,
"Cannot recover file : " + keyName
+ " as parent directory doesn't exist");
String fileName = OzoneFSUtils.getFileName(keyName);
- dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
- parentID, fileName);
+ dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
parentID, fileName);
keyInfo = getKey(dbFileKey);
if (keyInfo == null) {
- throw new OMException("Key:" + keyName + " not found", KEY_NOT_FOUND);
+ throw new OMException("Key:" + keyName + " not found in keyTable",
KEY_NOT_FOUND);
}
- final String clientId = keyInfo.getMetadata().remove(
- OzoneConsts.HSYNC_CLIENT_ID);
- if (clientId == null) {
+
+ final String writerId =
keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+ if (writerId == null) {
// if file is closed, do nothing and return right away.
- LOG.warn("Key:" + keyName + " is already closed");
- return null;
+ throw new OMException("Key: " + keyName + " is already closed",
KEY_ALREADY_CLOSED);
}
- String openFileDBKey = omMetadataManager.getOpenFileName(
- volumeId, bucketId, parentID, fileName, Long.parseLong(clientId));
- if (openFileDBKey != null) {
- commitKey(dbFileKey, keyInfo, fileName, ozoneManager,
- transactionLogIndex);
- removeOpenKey(openFileDBKey, fileName, transactionLogIndex);
+
+ dbOpenFileKey = omMetadataManager.getOpenFileName(
+ volumeId, bucketId, parentID, fileName, Long.parseLong(writerId));
+ openKeyInfo =
omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenFileKey);
+ if (openKeyInfo == null) {
+ throw new OMException("Open Key " + dbOpenFileKey + " not found in
openKeyTable", KEY_NOT_FOUND);
+ }
+
+ if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
+ LOG.debug("Key: " + keyName + " is already under recovery");
+ } else {
+ openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true");
+ openKeyInfo.setUpdateID(transactionLogIndex,
ozoneManager.isRatisEnabled());
+ openKeyInfo.setModificationTime(Time.now());
+ // Add to cache.
+ omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
+ new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex,
openKeyInfo));
}
+ keyInfo.setKeyName(keyName);
+ openKeyInfo.setKeyName(keyName);
+ RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder()
+
.setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(),
true))
+ .setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(),
true));
- return openFileDBKey;
+ return rb.build();
}
private OmKeyInfo getKey(String dbOzoneKey) throws IOException {
return omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
}
-
- private void commitKey(String dbOzoneKey, OmKeyInfo omKeyInfo,
- String fileName, OzoneManager ozoneManager,
- long transactionLogIndex) throws IOException {
- omKeyInfo.setModificationTime(Time.now());
- omKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled());
-
- OMFileRequest.addFileTableCacheEntry(omMetadataManager, dbOzoneKey,
- omKeyInfo, fileName, transactionLogIndex);
- }
-
- private void removeOpenKey(String openKeyName, String fileName,
- long transactionLogIndex) {
- OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
- openKeyName, null, fileName, transactionLogIndex);
- }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index 254aafcb50..ababd28c0a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
@@ -138,7 +139,7 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
newAllocatedBlockRequest.setKeyLocation(
omKeyLocationInfoList.get(0).getProtobuf(getOmRequest().getVersion()));
- return getOmRequest().toBuilder().setUserInfo(getUserInfo())
+ return
getOmRequest().toBuilder().setUserInfo(getUserIfNotExists(ozoneManager))
.setAllocateBlockRequest(newAllocatedBlockRequest).build();
}
@@ -206,6 +207,10 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
KEY_NOT_FOUND);
}
+ if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
+ throw new OMException("Open Key " + openKeyName + " is under lease
recovery",
+ KEY_UNDER_LEASE_RECOVERY);
+ }
List<OmKeyLocationInfo> newLocationList = Collections.singletonList(
OmKeyLocationInfo.getFromProtobuf(blockLocation));
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
index eb5a9ccb53..9fd8fe0ffa 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequestWithFSO.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
@@ -136,7 +137,10 @@ public class OMAllocateBlockRequestWithFSO extends
OMAllocateBlockRequest {
throw new OMException("Open Key not found " + openKeyName,
KEY_NOT_FOUND);
}
-
+ if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
+ throw new OMException("Open Key " + openKeyName + " is under lease
recovery",
+ KEY_UNDER_LEASE_RECOVERY);
+ }
List<OmKeyLocationInfo> newLocationList = Collections.singletonList(
OmKeyLocationInfo.getFromProtobuf(blockLocation));
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index ebcb1a040a..56d7b72e62 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -68,7 +68,9 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.util.Time;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_A_FILE;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NOT_SUPPORTED_OPERATION;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
@@ -102,12 +104,15 @@ public class OMKeyCommitRequest extends OMKeyRequest {
OmUtils.validateKeyName(StringUtils.removeEnd(keyArgs.getKeyName(),
OzoneConsts.FS_FILE_COPYING_TEMP_SUFFIX));
}
- boolean isHsync = commitKeyRequest.hasHsync() &&
- commitKeyRequest.getHsync();
+ boolean isHsync = commitKeyRequest.hasHsync() &&
commitKeyRequest.getHsync();
+ boolean isRecovery = commitKeyRequest.hasRecovery() &&
commitKeyRequest.getRecovery();
boolean enableHsync = ozoneManager.getConfiguration().getBoolean(
OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED,
OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED_DEFAULT);
- if (isHsync && !enableHsync) {
+
+ // If hsynced is called for a file, then this file is hsynced, otherwise
it's not hsynced.
+ // Currently, file lease recovery by design only supports recover hsynced
file
+ if ((isHsync || isRecovery) && !enableHsync) {
throw new OMException("Hsync is not enabled. To enable, " +
"set ozone.fs.hsync.enabled = true", NOT_SUPPORTED_OPERATION);
}
@@ -156,17 +161,20 @@ public class OMKeyCommitRequest extends OMKeyRequest {
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
- boolean isHSync = commitKeyRequest.hasHsync() &&
- commitKeyRequest.getHsync();
-
+ boolean isHSync = commitKeyRequest.hasHsync() &&
commitKeyRequest.getHsync();
+ boolean isRecovery = commitKeyRequest.hasRecovery() &&
commitKeyRequest.getRecovery();
+ // isHsync = true, a commit request as a result of client side hsync call
+ // isRecovery = true, a commit request as a result of client side
recoverLease call
+ // none of isHsync and isRecovery is true, a commit request as a result of
client side normal
+ // outputStream#close call.
if (isHSync) {
omMetrics.incNumKeyHSyncs();
} else {
omMetrics.incNumKeyCommits();
}
- LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
- isHSync, volumeName, bucketName, keyName);
+ LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName =
{}, keyName = {}",
+ isHSync, isRecovery, volumeName, bucketName, keyName);
try {
commitKeyArgs = resolveBucketLink(ozoneManager, commitKeyArgs, auditMap);
@@ -179,10 +187,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
commitKeyRequest.getClientID());
String dbOzoneKey =
- omMetadataManager.getOzoneKey(volumeName, bucketName,
- keyName);
- String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
- keyName, commitKeyRequest.getClientID());
+ omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
List<OmKeyLocationInfo>
locationInfoList = getOmKeyLocationInfos(ozoneManager,
commitKeyArgs);
@@ -225,9 +230,18 @@ public class OMKeyCommitRequest extends OMKeyRequest {
Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
OmKeyInfo keyToDelete =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
+ long writerClientId = commitKeyRequest.getClientID();
+ if (isRecovery && keyToDelete != null) {
+ String clientId =
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+ if (clientId == null) {
+ throw new OMException("Failed to recovery key, as " +
+ dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED);
+ }
+ writerClientId = Long.parseLong(clientId);
+ }
+
+ final String clientIdString = String.valueOf(writerClientId);
if (null != keyToDelete) {
- final String clientIdString
- = String.valueOf(commitKeyRequest.getClientID());
isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
@@ -235,21 +249,30 @@ public class OMKeyCommitRequest extends OMKeyRequest {
.isPresent();
}
+ String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
+ keyName, writerClientId);
omKeyInfo =
omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey);
if (omKeyInfo == null) {
- String action = "commit";
- if (isHSync) {
- action = "hsync";
- }
+ String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
"entry is not found in the OpenKey table", KEY_NOT_FOUND);
}
+ if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
+ omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) {
+ if (!isRecovery) {
+ throw new OMException("Cannot commit key " + dbOpenKey + " with " +
OzoneConsts.LEASE_RECOVERY +
+ " metadata while recovery flag is not set in request",
KEY_UNDER_LEASE_RECOVERY);
+ }
+ }
omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
commitKeyArgs.getMetadataList()));
+
if (isHSync) {
- omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
- String.valueOf(commitKeyRequest.getClientID()));
+ omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
clientIdString);
+ } else if (isRecovery) {
+ omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
+ omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
}
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
@@ -314,7 +337,7 @@ public class OMKeyCommitRequest extends OMKeyRequest {
// Add to cache of open key table and key table.
if (!isHSync) {
- // If isHSync = false, put a tombstone in OpenKeyTable cache,
+ // If !isHSync = true, put a tombstone in OpenKeyTable cache,
// indicating the key is removed from OpenKeyTable.
// So that this key can't be committed again.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
@@ -350,12 +373,12 @@ public class OMKeyCommitRequest extends OMKeyRequest {
}
// Debug logging for any key commit operation, successful or not
- LOG.debug("Key commit {} with isHSync = {}, omKeyInfo = {}",
- result == Result.SUCCESS ? "succeeded" : "failed", isHSync, omKeyInfo);
+ LOG.debug("Key commit {} with isHSync = {}, isRecovery = {}, omKeyInfo =
{}",
+ result == Result.SUCCESS ? "succeeded" : "failed", isHSync,
isRecovery, omKeyInfo);
if (!isHSync) {
auditLog(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
- exception, getOmRequest().getUserInfo()));
+ exception, getOmRequest().getUserInfo()));
processResult(commitKeyRequest, volumeName, bucketName, keyName,
omMetrics, exception, omKeyInfo, result);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index a4b0b9fa0b..5ffa7b87fb 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -57,7 +57,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_ALREADY_CLOSED;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
/**
@@ -102,17 +104,20 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
OMClientResponse omClientResponse = null;
boolean bucketLockAcquired = false;
Result result;
- boolean isHSync = commitKeyRequest.hasHsync() &&
- commitKeyRequest.getHsync();
-
+ boolean isHSync = commitKeyRequest.hasHsync() &&
commitKeyRequest.getHsync();
+ boolean isRecovery = commitKeyRequest.hasRecovery() &&
commitKeyRequest.getRecovery();
+ // isHsync = true, a commit request as a result of client side hsync call
+ // isRecovery = true, a commit request as a result of client side
recoverLease call
+ // none of isHsync and isRecovery is true, a commit request as a result of
client side normal
+ // outputStream#close call.
if (isHSync) {
omMetrics.incNumKeyHSyncs();
} else {
omMetrics.incNumKeyCommits();
}
- LOG.debug("isHSync = {}, volumeName = {}, bucketName = {}, keyName = {}",
- isHSync, volumeName, bucketName, keyName);
+ LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName =
{}, keyName = {}",
+ isHSync, isRecovery, volumeName, bucketName, keyName);
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
@@ -126,7 +131,6 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
keyName, IAccessAuthorizer.ACLType.WRITE,
commitKeyRequest.getClientID());
-
Iterator<Path> pathComponents = Paths.get(keyName).iterator();
String dbOpenFileKey = null;
@@ -150,29 +154,46 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
+ " as parent directory doesn't exist");
String dbFileKey = omMetadataManager.getOzonePathKey(volumeId, bucketId,
parentID, fileName);
- dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
- parentID, fileName, commitKeyRequest.getClientID());
+ OmKeyInfo keyToDelete =
+ omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
+ long writerClientId = commitKeyRequest.getClientID();
+ if (isRecovery && keyToDelete != null) {
+ String clientId =
keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
+ if (clientId == null) {
+ throw new OMException("Failed to recovery key, as " +
+ dbFileKey + " is already closed", KEY_ALREADY_CLOSED);
+ }
+ writerClientId = Long.parseLong(clientId);
+ }
+ dbOpenFileKey = omMetadataManager.getOpenFileName(volumeId, bucketId,
+ parentID, fileName, writerClientId);
omKeyInfo = OMFileRequest.getOmKeyInfoFromFileTable(true,
omMetadataManager, dbOpenFileKey, keyName);
if (omKeyInfo == null) {
- String action = "commit";
- if (isHSync) {
- action = "hsync";
- }
+ String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
throw new OMException("Failed to " + action + " key, as " +
- dbOpenFileKey + "entry is not found in the OpenKey table",
- KEY_NOT_FOUND);
+ dbOpenFileKey + " entry is not found in the OpenKey table",
KEY_NOT_FOUND);
+ }
+
+ if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
+ omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) {
+ if (!isRecovery) {
+ throw new OMException("Cannot commit key " + dbOpenFileKey + " with
" + OzoneConsts.LEASE_RECOVERY +
+ " metadata while recovery flag is not set in request",
KEY_UNDER_LEASE_RECOVERY);
+ }
}
+
omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
commitKeyArgs.getMetadataList()));
if (isHSync) {
- omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
- String.valueOf(commitKeyRequest.getClientID()));
+ omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
String.valueOf(writerClientId));
+ } else if (isRecovery) {
+ omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
+ omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
}
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
-
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
List<OmKeyLocationInfo> uncommitted =
@@ -187,11 +208,8 @@ public class OMKeyCommitRequestWithFSO extends
OMKeyCommitRequest {
// creation after the knob turned on.
boolean isPreviousCommitHsync = false;
Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
- OmKeyInfo keyToDelete =
- omMetadataManager.getKeyTable(getBucketLayout()).get(dbFileKey);
if (null != keyToDelete) {
- final String clientIdString
- = String.valueOf(commitKeyRequest.getClientID());
+ final String clientIdString = String.valueOf(writerClientId);
isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
index ef73ab5c82..fcefa473ff 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/file/OMRecoverLeaseResponse.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.key.OmKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .OMResponse;
+ .OMResponse;
import javax.annotation.Nonnull;
import java.io.IOException;
@@ -39,16 +39,14 @@ import static
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_FILE_TABLE;
@CleanupTableInfo(cleanupTables = {FILE_TABLE, OPEN_FILE_TABLE})
public class OMRecoverLeaseResponse extends OmKeyResponse {
- private OmKeyInfo keyInfo;
- private String dbFileKey;
+ private OmKeyInfo openKeyInfo;
private String openKeyName;
+
public OMRecoverLeaseResponse(@Nonnull OMResponse omResponse,
- BucketLayout bucketLayout, OmKeyInfo keyInfo, String dbFileKey,
- String openKeyName) {
+ BucketLayout bucketLayout, String openKeyName, OmKeyInfo openKeyInfo) {
super(omResponse, bucketLayout);
- this.keyInfo = keyInfo;
- this.dbFileKey = dbFileKey;
this.openKeyName = openKeyName;
+ this.openKeyInfo = openKeyInfo;
}
/**
@@ -64,12 +62,11 @@ public class OMRecoverLeaseResponse extends OmKeyResponse {
@Override
protected void addToDBBatch(OMMetadataManager omMetadataManager,
BatchOperation batchOperation) throws IOException {
- // Delete from OpenKey table
+ // Update OpenKey table
if (openKeyName != null) {
- omMetadataManager.getOpenKeyTable(getBucketLayout()).deleteWithBatch(
- batchOperation, openKeyName);
- omMetadataManager.getKeyTable(getBucketLayout())
- .putWithBatch(batchOperation, dbFileKey, keyInfo);
+ // In INIT stage, update the keyInfo in openKeyTable
+ omMetadataManager.getOpenKeyTable(getBucketLayout()).putWithBatch(
+ batchOperation, openKeyName, openKeyInfo);
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
index bc41b6aa0c..c4f90958c7 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponse.java
@@ -84,7 +84,7 @@ public class OMKeyCommitResponse extends OmKeyResponse {
// Delete from OpenKey table
if (!isHSync()) {
omMetadataManager.getOpenKeyTable(getBucketLayout())
- .deleteWithBatch(batchOperation, openKeyName);
+ .deleteWithBatch(batchOperation, openKeyName);
}
omMetadataManager.getKeyTable(getBucketLayout())
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
index ff267f7c07..6073632e55 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyCommitResponseWithFSO.java
@@ -77,18 +77,18 @@ public class OMKeyCommitResponseWithFSO extends
OMKeyCommitResponse {
// Delete from OpenKey table if commit
if (!this.isHSync()) {
omMetadataManager.getOpenKeyTable(getBucketLayout())
- .deleteWithBatch(batchOperation, getOpenKeyName());
+ .deleteWithBatch(batchOperation, getOpenKeyName());
}
OMFileRequest.addToFileTable(omMetadataManager, batchOperation,
- getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID());
+ getOmKeyInfo(), volumeId, getOmBucketInfo().getObjectID());
updateDeletedTable(omMetadataManager, batchOperation);
// update bucket usedBytes.
omMetadataManager.getBucketTable().putWithBatch(batchOperation,
- omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(),
- getOmBucketInfo().getBucketName()), getOmBucketInfo());
+ omMetadataManager.getBucketKey(getOmBucketInfo().getVolumeName(),
+ getOmBucketInfo().getBucketName()), getOmBucketInfo());
}
@Override
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 45209258f7..f52af51919 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -1571,7 +1571,7 @@ public final class OMRequestTestUtils {
OMRequestTestUtils.createOmDirectoryInfo(pathElement, ++objectId,
parentId);
OMRequestTestUtils.addDirKeyToDirTable(true, omDirInfo,
- volumeName, bucketName, txnID, omMetaMgr);
+ volumeName, bucketName, ++txnID, omMetaMgr);
parentId = omDirInfo.getObjectID();
}
return parentId;
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
index 5c10c0821d..476e2b2f93 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/file/TestOMRecoverLeaseRequest.java
@@ -18,39 +18,45 @@
package org.apache.hadoop.ozone.om.request.file;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
import org.apache.hadoop.ozone.om.request.key.TestOMKeyRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .KeyLocation;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
- .RecoverLeaseRequest;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
import org.apache.hadoop.util.Time;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
+import static
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB.setReplicationConfig;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.when;
/**
* Tests OMRecoverLeaseRequest.
@@ -71,34 +77,190 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
*/
@Test
public void testRecoverHsyncFile() throws Exception {
- when(ozoneManager.getAclsEnabled()).thenReturn(true);
- when(ozoneManager.getVolumeOwner(
- anyString(),
- any(IAccessAuthorizer.ACLType.class), any(
- OzoneObj.ResourceType.class)))
- .thenReturn("user");
- InetSocketAddress address = new InetSocketAddress("localhost", 10000);
- when(ozoneManager.getOmRpcServerAddr()).thenReturn(address);
- populateNamespace(true, true);
+ populateNamespace(true, true, true, true);
OMClientResponse omClientResponse = validateAndUpdateCache();
+ OMResponse omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ RecoverLeaseResponse recoverLeaseResponse =
omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo);
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
- Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+ omClientResponse =
validateAndUpdateCacheForCommit(getNewKeyArgs(omKeyInfo, 0));
+ omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+
+ verifyTables(true, false);
+ }
+
+ /**
+ * Verify that RecoverLease request is idempotent.
+ * @throws Exception
+ */
+ @Test
+ public void testInitStageIdempotent() throws Exception {
+ populateNamespace(true, true, true, true);
+
+ // call recovery first time
+ OMClientResponse omClientResponse = validateAndUpdateCache();
+ OMResponse omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ RecoverLeaseResponse recoverLeaseResponse =
omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo1 = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo1);
+
+ // call recovery second time
+ omClientResponse = validateAndUpdateCache();
+ omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ recoverLeaseResponse = omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo2 = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo2);
+ Assertions.assertEquals(keyInfo1.getKeyName(), keyInfo2.getKeyName());
+ }
+
+ /**
+ * Verify that COMMIT request for recovery is not idempotent.
+ * @throws Exception
+ */
+ @Test
+ public void testCommitStageNotIdempotent() throws Exception {
+ populateNamespace(true, true, true, true);
+
+ // call recovery
+ OMClientResponse omClientResponse = validateAndUpdateCache();
+ OMResponse omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ RecoverLeaseResponse recoverLeaseResponse =
omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo);
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+ KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0);
+
+ // call commit first time
+ omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs);
+ omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+
+ // call commit second time
+ omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs);
+ omResponse = omClientResponse.getOMResponse();
+
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_ALREADY_CLOSED,
omResponse.getStatus());
+ }
+
+ /**
+ * Verify that RecoverLease COMMIT request has a new file length.
+ * @throws Exception
+ */
+ @Test
+ public void testRecoverWithNewFileLength() throws Exception {
+ populateNamespace(true, true, true, true);
+
+ // call recovery
+ OMClientResponse omClientResponse = validateAndUpdateCache();
+ OMResponse omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ RecoverLeaseResponse recoverLeaseResponse =
omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo);
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+ // call commit
+ long deltaLength = 100;
+ KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, deltaLength);
+ omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs);
+ omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+
+ // get file length and check the length is as expected
+ String ozoneKey = getFileName();
+ OmKeyInfo omKeyInfoFetched =
omMetadataManager.getKeyTable(getBucketLayout()).get(ozoneKey);
+ Assertions.assertEquals(omKeyInfo.getDataSize(),
omKeyInfoFetched.getDataSize());
+
+ // check the final block length is as expected
+ List<OmKeyLocationInfo> locationInfoListFetched =
+
omKeyInfoFetched.getLatestVersionLocations().getBlocksLatestVersionOnly();
+ List<OmKeyLocationInfo> omKeyLocationInfos =
omKeyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
+ Assertions.assertEquals(omKeyLocationInfos.get(omKeyLocationInfos.size() -
1).getLength(),
+ locationInfoListFetched.get(locationInfoListFetched.size() -
1).getLength());
+
+ // check the committed file doesn't have HSYNC_CLIENT_ID and
LEASE_RECOVERY metadata
+
Assertions.assertNull(omKeyInfoFetched.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID));
+
Assertions.assertNull(omKeyInfoFetched.getMetadata().get(OzoneConsts.LEASE_RECOVERY));
+ }
+
+ /**
+ * Verify that RecoverLease COMMIT request has a new client ID.
+ * @throws Exception
+ */
+ @Test
+ public void testRecoverWithNewClientID() throws Exception {
+ populateNamespace(true, true, true, true);
+
+ // call recovery
+ OMClientResponse omClientResponse = validateAndUpdateCache();
+ OMResponse omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ RecoverLeaseResponse recoverLeaseResponse =
omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo);
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+ // call commit
+ KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0);
+ omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs, true, true);
+ omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ }
+
+ /**
+ * Verify that an under recovery file will reject allocate block and further
hsync call(commit).
+ * @throws Exception
+ */
+ @Test
+ public void testRejectAllocateBlockAndHsync() throws Exception {
+ populateNamespace(true, true, true, true);
+
+ // call recovery
+ OMClientResponse omClientResponse = validateAndUpdateCache();
+ OMResponse omResponse = omClientResponse.getOMResponse();
+ Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
omResponse.getStatus());
+ RecoverLeaseResponse recoverLeaseResponse =
omResponse.getRecoverLeaseResponse();
+ KeyInfo keyInfo = recoverLeaseResponse.getKeyInfo();
+ Assertions.assertNotNull(keyInfo);
+ OmKeyInfo omKeyInfo = OmKeyInfo.getFromProtobuf(keyInfo);
+
+ // call allocate block
+ OMRequest request = createAllocateBlockRequest(volumeName, bucketName,
keyName);
+ OMAllocateBlockRequestWithFSO omAllocateBlockRequest =
+ new OMAllocateBlockRequestWithFSO(request, getBucketLayout());
+ request = omAllocateBlockRequest.preExecute(ozoneManager);
+ assertNotNull(request.getUserInfo());
+ omAllocateBlockRequest = new OMAllocateBlockRequestWithFSO(request,
getBucketLayout());
+ omClientResponse = omAllocateBlockRequest.validateAndUpdateCache(
+ ozoneManager, 100L, ozoneManagerDoubleBufferHelper);
+
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_UNDER_LEASE_RECOVERY,
omClientResponse.getOMResponse().getStatus());
- verifyTables(true, true);
+ // call commit(hsync calls commit)
+ KeyArgs newKeyArgs = getNewKeyArgs(omKeyInfo, 0);
+ omClientResponse = validateAndUpdateCacheForCommit(newKeyArgs, false,
false);
+
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_UNDER_LEASE_RECOVERY,
+ omClientResponse.getOMResponse().getStatus());
}
/**
- * verify that recover a closed file should be allowed (essentially no-op).
- */
+ * verify that recover a closed file.
+ **/
@Test
public void testRecoverClosedFile() throws Exception {
- populateNamespace(true, false);
+ populateNamespace(true, false, false, false);
OMClientResponse omClientResponse = validateAndUpdateCache();
- Assertions.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+
Assertions.assertEquals(OzoneManagerProtocolProtos.Status.KEY_ALREADY_CLOSED,
omClientResponse.getOMResponse().getStatus());
verifyTables(true, false);
@@ -109,7 +271,7 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
*/
@Test
public void testRecoverOpenFile() throws Exception {
- populateNamespace(false, true);
+ populateNamespace(false, false, true, false);
OMClientResponse omClientResponse = validateAndUpdateCache();
@@ -125,7 +287,7 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
*/
@Test
public void testRecoverAbsentFile() throws Exception {
- populateNamespace(false, false);
+ populateNamespace(false, false, false, false);
OMClientResponse omClientResponse = validateAndUpdateCache();
@@ -135,8 +297,37 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
verifyTables(false, false);
}
- private void populateNamespace(boolean addKeyTable, boolean addOpenKeyTable)
- throws Exception {
+ private KeyArgs getNewKeyArgs(OmKeyInfo omKeyInfo, long deltaLength) throws
IOException {
+ OmKeyLocationInfoGroup omKeyLocationInfoGroup =
omKeyInfo.getLatestVersionLocations();
+ List<OmKeyLocationInfo> omKeyLocationInfoList =
omKeyLocationInfoGroup.getBlocksLatestVersionOnly();
+ long lastBlockLength =
omKeyLocationInfoList.get(omKeyLocationInfoList.size() - 1).getLength();
+ omKeyLocationInfoList.get(omKeyLocationInfoList.size() -
1).setLength(lastBlockLength + deltaLength);
+
+ long fileLength =
omKeyLocationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
+ omKeyInfo.setDataSize(fileLength);
+ OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(omKeyInfo.getVolumeName())
+
.setBucketName(omKeyInfo.getBucketName()).setKeyName(omKeyInfo.getKeyName())
+
.setReplicationConfig(omKeyInfo.getReplicationConfig()).setDataSize(fileLength)
+
.setLocationInfoList(omKeyLocationInfoList).setLatestVersionLocation(true)
+ .build();
+
+ List<OmKeyLocationInfo> locationInfoList = keyArgs.getLocationInfoList();
+ Preconditions.checkNotNull(locationInfoList);
+ KeyArgs.Builder keyArgsBuilder = KeyArgs.newBuilder()
+ .setVolumeName(keyArgs.getVolumeName())
+ .setBucketName(keyArgs.getBucketName())
+ .setKeyName(keyArgs.getKeyName())
+ .setDataSize(keyArgs.getDataSize())
+ .addAllMetadata(KeyValueUtil.toProtobuf(keyArgs.getMetadata()))
+ .addAllKeyLocations(locationInfoList.stream()
+ .map(info -> info.getProtobuf(ClientVersion.CURRENT_VERSION))
+ .collect(Collectors.toList()));
+ setReplicationConfig(keyArgs.getReplicationConfig(), keyArgsBuilder);
+ return keyArgsBuilder.build();
+ }
+
+ private void populateNamespace(boolean addKeyTable, boolean
keyInfoWithHsyncFlag,
+ boolean addOpenKeyTable, boolean openKeyInfoWithHsyncFlag) throws
Exception {
String parentDir = "c/d/e";
String fileName = "f";
keyName = parentDir + "/" + fileName;
@@ -151,14 +342,14 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
OmKeyInfo omKeyInfo;
if (addKeyTable) {
- String ozoneKey = addToFileTable(allocatedLocationList);
+ String ozoneKey = addToFileTable(allocatedLocationList,
keyInfoWithHsyncFlag);
omKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
.get(ozoneKey);
assertNotNull(omKeyInfo);
}
if (addOpenKeyTable) {
- String openKey = addToOpenFileTable(allocatedLocationList);
+ String openKey = addToOpenFileTable(allocatedLocationList,
openKeyInfoWithHsyncFlag);
omKeyInfo = omMetadataManager.getOpenKeyTable(getBucketLayout())
.get(openKey);
@@ -166,22 +357,33 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
}
}
+ protected OMRequest createAllocateBlockRequest(String volumeName, String
bucketName, String keyName) {
+ KeyArgs keyArgs = KeyArgs.newBuilder()
+ .setVolumeName(volumeName).setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setFactor(replicationFactor).setType(replicationType)
+ .build();
+
+ AllocateBlockRequest allocateBlockRequest =
+
AllocateBlockRequest.newBuilder().setClientID(clientID).setKeyArgs(keyArgs).build();
+
+ return OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.AllocateBlock)
+ .setClientId(UUID.randomUUID().toString())
+ .setAllocateBlockRequest(allocateBlockRequest).build();
+ }
+
@NotNull
protected OMRequest createRecoverLeaseRequest(
String volumeName, String bucketName, String keyName) {
-
- RecoverLeaseRequest recoverLeaseRequest = RecoverLeaseRequest.newBuilder()
- .setVolumeName(volumeName)
- .setBucketName(bucketName)
- .setKeyName(keyName).build();
-
+ RecoverLeaseRequest.Builder rb = RecoverLeaseRequest.newBuilder();
+ rb.setVolumeName(volumeName).setBucketName(bucketName).setKeyName(keyName);
return OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.RecoverLease)
.setClientId(UUID.randomUUID().toString())
- .setRecoverLeaseRequest(recoverLeaseRequest).build();
+ .setRecoverLeaseRequest(rb.build()).build();
}
-
private OMClientResponse validateAndUpdateCache() throws Exception {
OMRequest modifiedOmRequest = doPreExecute(createRecoverLeaseRequest(
volumeName, bucketName, keyName));
@@ -196,6 +398,34 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
return omClientResponse;
}
+ @NotNull
+ protected OMRequest createKeyCommitRequest(KeyArgs keyArgs, boolean
newClientID, boolean recovery) {
+ CommitKeyRequest.Builder rb =
+
CommitKeyRequest.newBuilder().setKeyArgs(keyArgs).setRecovery(recovery);
+ rb.setClientID(newClientID ? clientID + 1 : clientID);
+ return OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.CommitKey)
+ .setClientId(UUID.randomUUID().toString())
+ .setCommitKeyRequest(rb.build()).build();
+ }
+
+ private OMClientResponse validateAndUpdateCacheForCommit(KeyArgs keyArgs)
throws Exception {
+ return validateAndUpdateCacheForCommit(keyArgs, false, true);
+ }
+
+ private OMClientResponse validateAndUpdateCacheForCommit(KeyArgs keyArgs,
boolean newClientID,
+ boolean recovery) throws Exception {
+ OMRequest omRequest = createKeyCommitRequest(keyArgs, newClientID,
recovery);
+ OMKeyCommitRequestWithFSO omKeyCommitRequest = new
OMKeyCommitRequestWithFSO(omRequest, getBucketLayout());
+ OMRequest modifiedOmRequest = omKeyCommitRequest.preExecute(ozoneManager);
+ assertNotNull(modifiedOmRequest.getUserInfo());
+
+ omKeyCommitRequest = new OMKeyCommitRequestWithFSO(modifiedOmRequest,
getBucketLayout());
+ OMClientResponse omClientResponse =
+ omKeyCommitRequest.validateAndUpdateCache(ozoneManager, 100L,
ozoneManagerDoubleBufferHelper);
+ return omClientResponse;
+ }
+
private void verifyTables(boolean hasKey, boolean hasOpenKey)
throws IOException {
// Now entry should be created in key Table.
@@ -238,8 +468,7 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
fileName);
}
- protected OMRecoverLeaseRequest getOmRecoverLeaseRequest(
- OMRequest omRequest) {
+ protected OMRecoverLeaseRequest getOmRecoverLeaseRequest(OMRequest
omRequest) {
return new OMRecoverLeaseRequest(omRequest);
}
@@ -269,14 +498,16 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
return modifiedOmRequest;
}
- String addToOpenFileTable(List<OmKeyLocationInfo> locationList)
+ String addToOpenFileTable(List<OmKeyLocationInfo> locationList, boolean
hsyncFlag)
throws Exception {
OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketName, keyName, replicationType, replicationFactor, 0, parentId,
0, Time.now(), version);
omKeyInfo.appendNewBlocks(locationList, false);
- omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
- String.valueOf(clientID));
+ if (hsyncFlag) {
+ omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+ String.valueOf(clientID));
+ }
OMRequestTestUtils.addFileToKeyTable(
true, false, omKeyInfo.getFileName(),
@@ -291,12 +522,16 @@ public class TestOMRecoverLeaseRequest extends
TestOMKeyRequest {
omKeyInfo.getParentObjectID(), omKeyInfo.getFileName(), clientID);
}
- String addToFileTable(List<OmKeyLocationInfo> locationList)
+ String addToFileTable(List<OmKeyLocationInfo> locationList, boolean
hsyncFlag)
throws Exception {
OmKeyInfo omKeyInfo = OMRequestTestUtils.createOmKeyInfo(volumeName,
bucketName, keyName, replicationType, replicationFactor, 0, parentId,
0, Time.now(), version);
omKeyInfo.appendNewBlocks(locationList, false);
+ if (hsyncFlag) {
+ omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+ String.valueOf(clientID));
+ }
OMRequestTestUtils.addFileToKeyTable(
false, false, omKeyInfo.getFileName(),
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
index a72756d010..40fccb8fda 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyCommitResponse.java
@@ -161,6 +161,6 @@ public class TestOMKeyCommitResponse extends
TestOMKeyResponse {
new RepeatedOmKeyInfo(e)));
}
return new OMKeyCommitResponse(omResponse, omKeyInfo, ozoneKey, openKey,
- omBucketInfo, deleteKeyMap, isHSync);
+ omBucketInfo, deleteKeyMap, isHSync);
}
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index a60134d4f6..e510d9841b 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -733,13 +734,20 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
}
@Override
- public boolean recoverLease(final String pathStr) throws IOException {
- incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1);
+ public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws
IOException {
+ incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
volume.getName(), bucket.getName(), pathStr);
}
+ @Override
+ public void recoverFile(OmKeyArgs keyArgs) throws IOException {
+ incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
+
+ ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+ }
+
@Override
public void setTimes(String key, long mtime, long atime) throws IOException {
incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 085a2300fd..567004bc81 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -1403,8 +1404,8 @@ public class BasicRootedOzoneClientAdapterImpl
}
@Override
- public boolean recoverLease(final String pathStr) throws IOException {
- incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1);
+ public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws
IOException {
+ incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
OFSPath ofsPath = new OFSPath(pathStr, config);
OzoneVolume volume = objectStore.getVolume(ofsPath.getVolumeName());
@@ -1413,6 +1414,13 @@ public class BasicRootedOzoneClientAdapterImpl
volume.getName(), bucket.getName(), ofsPath.getKeyName());
}
+ @Override
+ public void recoverFile(OmKeyArgs keyArgs) throws IOException {
+ incrementCounter(Statistic.INVOCATION_RECOVER_FILE, 1);
+
+ ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
+ }
+
@Override
public void setTimes(String key, long mtime, long atime) throws IOException {
incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index c48f1a6366..dbce02dd56 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -95,7 +97,9 @@ public interface OzoneClientAdapter {
String fromSnapshot, String toSnapshot)
throws IOException, InterruptedException;
- boolean recoverLease(String pathStr) throws IOException;
+ List<OmKeyInfo> recoverFilePrepare(String pathStr) throws IOException;
+
+ void recoverFile(OmKeyArgs keyArgs) throws IOException;
void setTimes(String key, long mtime, long atime) throws IOException;
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
index aae71e9c4c..10abc57091 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
@@ -77,8 +77,9 @@ public enum Statistic {
"Calls of setTimes()"),
INVOCATION_IS_FILE_CLOSED("op_is_file_closed",
"Calls of isFileClosed()"),
- INVOCATION_RECOVER_LEASE("op_recover_lease",
- "Calls of recoverLease()"),
+ INVOCATION_RECOVER_FILE_PREPARE("op_recover_file_prepare",
+ "Calls of recoverFilePrepare()"),
+ INVOCATION_RECOVER_FILE("op_recover_file", "Calls of recoverFile()"),
INVOCATION_SET_SAFE_MODE("op_set_safe_mode",
"Calls of setSafeMode()");
diff --git
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 203e7ed373..8f977167a2 100644
---
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.ozone;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.List;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
/**
@@ -129,7 +132,16 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
LOG.trace("recoverLease() path:{}", f);
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- return getAdapter().recoverLease(key);
+ List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+ // TODO: query DN to get the final block length
+ OmKeyInfo keyInfo = infoList.get(0);
+ OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+ .build();
+ getAdapter().recoverFile(keyArgs);
+ return true;
}
@Override
diff --git
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 9b1596c05b..0530b606b2 100644
---
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -29,11 +29,14 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.List;
/**
* The Rooted Ozone Filesystem (OFS) implementation.
@@ -131,17 +134,24 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
*/
@Override
public boolean recoverLease(final Path f) throws IOException {
- incrementCounter(Statistic.INVOCATION_RECOVER_LEASE, 1);
statistics.incrementWriteOps(1);
LOG.trace("recoverLease() path:{}", f);
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- return getAdapter().recoverLease(key);
+ List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+ // TODO: query DN to get the final block length
+ OmKeyInfo keyInfo = infoList.get(0);
+ OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+ .build();
+ getAdapter().recoverFile(keyArgs);
+ return true;
}
@Override
public boolean isFileClosed(Path f) throws IOException {
- incrementCounter(Statistic.INVOCATION_IS_FILE_CLOSED, 1);
statistics.incrementReadOps(1);
LOG.trace("isFileClosed() path:{}", f);
Path qualifiedPath = makeQualified(f);
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 71f01e4414..a31a07cf67 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.ozone;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.List;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
/**
@@ -129,7 +132,16 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
LOG.trace("isFileClosed() path:{}", f);
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- return getAdapter().recoverLease(key);
+ List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+ // TODO: query DN to get the final block length
+ OmKeyInfo keyInfo = infoList.get(0);
+ OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+ .build();
+ getAdapter().recoverFile(keyArgs);
+ return true;
}
@Override
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 7561e20a87..ed62f608b4 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -29,11 +29,14 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
+import java.util.List;
/**
* The Rooted Ozone Filesystem (OFS) implementation.
@@ -128,7 +131,16 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
LOG.trace("recoverLease() path:{}", f);
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- return getAdapter().recoverLease(key);
+ List<OmKeyInfo> infoList = getAdapter().recoverFilePrepare(key);
+ // TODO: query DN to get the final block length
+ OmKeyInfo keyInfo = infoList.get(0);
+ OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
+
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
+
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+ .build();
+ getAdapter().recoverFile(keyArgs);
+ return true;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]