bharatviswa504 commented on a change in pull request #1169:
URL: https://github.com/apache/hadoop-ozone/pull/1169#discussion_r452561712



##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
##########
@@ -116,89 +111,112 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
+    // As right now, only client exposed API is for a single volume and
+    // bucket. So, all entries will have same volume name and bucket name.
+    // So, we can validate once.
+    if (deleteKeyArgsList.size() > 0) {
+      volumeName = deleteKeyArgsList.get(0).getVolumeName();
+      bucketName = deleteKeyArgsList.get(0).getBucketName();
+    }
+
+    boolean acquiredLock =
+        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+
+    int indexFailed = 0;
     try {
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
+
+      // Validate bucket and volume exists or not.
+      if (deleteKeyArgsList.size() > 0) {
+        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+      }
+
+
+      // Check if any of the key in the batch cannot be deleted. If exists the
+      // batch delete will be failed.
+
+      for (indexFailed = 0; indexFailed < deleteKeyArgsList.size();
+           indexFailed++) {
+        KeyArgs deleteKeyArgs = deleteKeyArgsList.get(0);
+        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
         volumeName = deleteKeyArgs.getVolumeName();
         bucketName = deleteKeyArgs.getBucketName();
         keyName = deleteKeyArgs.getKeyName();
         String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
             keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
-        omKeyInfoList.add(omKeyInfo);
-        unDeletedKeys.add(omKeyInfo);
-      }
 
-      // Check if any of the key in the batch cannot be deleted. If exists the
-      // batch will delete failed.
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
+
+        // Do we need to fail the batch if one of the key does not exist?
+        // For now following the previous code behavior. If this code changes
+        // behavior, this will be incompatible change across upgrades, and we
+        // need to version the Requests and do logic accordingly.
+
+        if (omKeyInfo == null) {
+          LOG.error("Key does not exist {}", objectKey);
+          throw new OMException("Key Not Found " + objectKey, KEY_NOT_FOUND);
+        }
+
         // check Acl
         checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
             IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
-        String objectKey = omMetadataManager.getOzoneKey(
-            volumeName, bucketName, keyName);
-
-        // Validate bucket and volume exists or not.
-        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
-
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
+        omKeyInfoList.add(omKeyInfo);
+      }
 
-        if (omKeyInfo == null) {
-          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
-        }
 
-        // Check if this transaction is a replay of ratis logs.
-        if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
-          // Replay implies the response has already been returned to
-          // the client. So take no further action and return a dummy
-          // OMClientResponse.
-          throw new OMReplayException();
-        }
+      // Mark all keys in cache as deleted.
+      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
+        volumeName = deleteKeyArgs.getVolumeName();
+        bucketName = deleteKeyArgs.getBucketName();
+        keyName = deleteKeyArgs.getKeyName();
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+                keyName)),
+            new CacheValue<>(Optional.absent(), trxnLogIndex));
       }
 
+
       omClientResponse = new OMKeysDeleteResponse(omResponse
-          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
-          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+              .setStatus(true)).build(), omKeyInfoList, trxnLogIndex,
+          ozoneManager.isRatisEnabled());
       result = Result.SUCCESS;
+
     } catch (IOException ex) {
-      if (ex instanceof OMReplayException) {
-        result = Result.REPLAY;
-        omClientResponse = new OMKeyDeleteResponse(createReplayOMResponse(
-            omResponse));
-      } else {
-        result = Result.FAILURE;
-        exception = ex;
-
-        omClientResponse = new OMKeyDeleteResponse(
-            createOperationKeysErrorOMResponse(omResponse, exception,
-                unDeletedKeys));
-      }
+      result = Result.FAILURE;
+      exception = ex;
+      createErrorOMResponse(omResponse, ex);
+      omResponse.setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+          .setStatus(false).build()).build();
+      omClientResponse = new OMKeysDeleteResponse(omResponse.build());
 
     } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
     }
 
-    // Performing audit logging outside of the lock.
-    if (result != Result.REPLAY) {
-      auditLog(auditLogger, buildAuditMessage(
-          OMAction.DELETE_KEY, auditMap, exception, userInfo));
+    // When we get any error during iteration build the remaining audit map
+    // from deleteKeyArgsList.
+    for (int i = indexFailed; i < deleteKeyArgsList.size(); i++) {
+      buildKeyArgsAuditMap(deleteKeyArgsList.get(i));
     }
 
+    auditLog(auditLogger, buildAuditMessage(
+        OMAction.DELETE_KEYS, auditMap, exception, userInfo));

Review comment:
       Good catch. Thanks fixed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-issues-h...@hadoop.apache.org

Reply via email to