hanishakoneru commented on a change in pull request #505: HDDS-2956. Handle 
Replay of AllocateBlock request
URL: https://github.com/apache/hadoop-ozone/pull/505#discussion_r373606672
 
 

 ##########
 File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
 ##########
 @@ -160,71 +160,98 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
     Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);
     auditMap.put(OzoneConsts.CLIENT_ID, String.valueOf(clientID));
 
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    String openKeyName = omMetadataManager.getOpenKey(volumeName, bucketName,
+        keyName, clientID);
+
     OMResponse.Builder omResponse = OMResponse.newBuilder().setCmdType(
         OzoneManagerProtocolProtos.Type.AllocateBlock).setStatus(
         OzoneManagerProtocolProtos.Status.OK).setSuccess(true);
+    OMClientResponse omClientResponse = null;
 
+    OmKeyInfo openKeyInfo = null;
     IOException exception = null;
-    OmKeyInfo omKeyInfo = null;
+
     try {
       // check Acl
       checkKeyAclsInOpenKeyTable(ozoneManager, volumeName, bucketName, keyName,
           IAccessAuthorizer.ACLType.WRITE, allocateBlockRequest.getClientID());
 
-      OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
       validateBucketAndVolume(omMetadataManager, volumeName,
           bucketName);
 
-      String openKey = omMetadataManager.getOpenKey(
-          volumeName, bucketName, keyName, clientID);
-
       // Here we don't acquire bucket/volume lock because for a single client
       // allocateBlock is called in serial fashion.
 
-      omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
-      if (omKeyInfo == null) {
-        throw new OMException("Open Key not found " + openKey, KEY_NOT_FOUND);
+      openKeyInfo = omMetadataManager.getOpenKeyTable().get(openKeyName);
+      if (openKeyInfo == null) {
+        // Check if this transaction is a replay of ratis logs.
+        // If the Key was already committed and this transaction is being
+        // replayed, we should ignore this transaction.
+        String ozoneKey = omMetadataManager.getOzoneKey(volumeName,
+            bucketName, keyName);
+        OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable().get(ozoneKey);
+        if (dbKeyInfo != null) {
+          if (isReplay(ozoneManager, dbKeyInfo.getUpdateID(), trxnLogIndex)) {
+            // This transaction is a replay. Send replay response.
+            throw new OMReplayException();
+          }
+        }
+        throw new OMException("Open Key not found " + openKeyName,
+            KEY_NOT_FOUND);
+      }
+
+      // Check if this transaction is a replay of ratis logs.
+      // Check the updateID of the openKey to verify that it is not greater
+      // than the current transactionLogIndex
+      if (isReplay(ozoneManager, openKeyInfo.getUpdateID(), trxnLogIndex)) {
+        // This transaction is a replay. Send replay response.
+        throw new OMReplayException();
       }
 
       // Append new block
-      omKeyInfo.appendNewBlocks(Collections.singletonList(
+      openKeyInfo.appendNewBlocks(Collections.singletonList(
           OmKeyLocationInfo.getFromProtobuf(blockLocation)), false);
 
       // Set modification time.
-      omKeyInfo.setModificationTime(keyArgs.getModificationTime());
+      openKeyInfo.setModificationTime(keyArgs.getModificationTime());
 
       // Set the UpdateID to current transactionLogIndex
-      omKeyInfo.setUpdateID(transactionLogIndex);
+      openKeyInfo.setUpdateID(trxnLogIndex);
 
       // Add to cache.
       omMetadataManager.getOpenKeyTable().addCacheEntry(
-          new CacheKey<>(openKey), new CacheValue<>(Optional.of(omKeyInfo),
-              transactionLogIndex));
+          new CacheKey<>(openKeyName),
+          new CacheValue<>(Optional.of(openKeyInfo), trxnLogIndex));
 
+      omResponse.setAllocateBlockResponse(AllocateBlockResponse.newBuilder()
+          .setKeyLocation(blockLocation).build());
+      omClientResponse = new OMAllocateBlockResponse(omResponse.build(),
+          openKeyInfo, clientID);
+      LOG.debug("Allocated block for Volume:{}, Bucket:{}, OpenKey:{}",
+          volumeName, bucketName, openKeyName);
     } catch (IOException ex) {
-      exception = ex;
+      if (ex instanceof OMReplayException) {
+        omClientResponse = new OMAllocateBlockResponse(createReplayOMResponse(
+            omResponse));
+        LOG.debug("Replayed Transaction {} ignored. Request: {}", trxnLogIndex,
+            allocateBlockRequest);
+      } else {
+        omMetrics.incNumBlockAllocateCallFails();
+        exception = ex;
+        omClientResponse = new OMAllocateBlockResponse(createErrorOMResponse(
+            omResponse, exception));
+        LOG.error("Allocate Block failed. Volume:{}, Bucket:{}, OpenKey:{}. " +
+            "Exception:{}", volumeName, bucketName, openKeyName, exception);
+      }
     }
 
     auditLog(auditLogger, buildAuditMessage(OMAction.ALLOCATE_BLOCK, auditMap,
 
 Review comment:
   done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-issues-h...@hadoop.apache.org

Reply via email to