xiaoyuyao commented on a change in pull request #1169:
URL: https://github.com/apache/hadoop-ozone/pull/1169#discussion_r452996069



##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
##########
@@ -116,93 +98,112 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    try {
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
-            keyName);
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
-        omKeyInfoList.add(omKeyInfo);
-        unDeletedKeys.add(omKeyInfo);
-      }
 
-      // Check if any of the key in the batch cannot be deleted. If exists the
-      // batch will delete failed.
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
-        // check Acl
-        checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-            IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
-        String objectKey = omMetadataManager.getOzoneKey(
-            volumeName, bucketName, keyName);
+    boolean acquiredLock =
+        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
 
-        // Validate bucket and volume exists or not.
-        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+    int indexFailed = 0;
+    int length = deleteKeys.size();
+    OzoneManagerProtocolProtos.DeleteKeyArgs.Builder unDeletedKeys =
+        OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName);
+
+    boolean deleteStatus = true;
+    try {
 
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      for (indexFailed = 0; indexFailed < length; indexFailed++) {
+        keyName = deleteKeyArgs.getKeys(indexFailed);
+        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+            keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
 
         if (omKeyInfo == null) {
-          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
+          deleteStatus = false;
+          LOG.error("Received a request to delete a Key does not exist {}",
+              objectKey);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
+          continue;
         }
 
-        // Check if this transaction is a replay of ratis logs.
-        if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
-          // Replay implies the response has already been returned to
-          // the client. So take no further action and return a dummy
-          // OMClientResponse.
-          throw new OMReplayException();
+        try {
+          // check Acl
+          checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+          omKeyInfoList.add(omKeyInfo);
+        } catch (Exception ex) {
+          deleteStatus = false;
+          LOG.error("Acl check failed for Key: {}", objectKey, ex);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
         }
       }
 
+      // Mark all keys which can be deleted, in cache as deleted.
+      for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+                omKeyInfo.getKeyName())),
+            new CacheValue<>(Optional.absent(), trxnLogIndex));
+      }
+
       omClientResponse = new OMKeysDeleteResponse(omResponse
-          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
-          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+              .setStatus(deleteStatus).setUnDeletedKeys(unDeletedKeys))
+          .setStatus(deleteStatus ? OK : PARTIAL_DELETE)
+          .setSuccess(deleteStatus).build(),
+          omKeyInfoList, trxnLogIndex,
+          ozoneManager.isRatisEnabled());
+
       result = Result.SUCCESS;
+
     } catch (IOException ex) {
-      if (ex instanceof OMReplayException) {
-        result = Result.REPLAY;
-        omClientResponse = new OMKeyDeleteResponse(createReplayOMResponse(
-            omResponse));
-      } else {
-        result = Result.FAILURE;
-        exception = ex;
-
-        omClientResponse = new OMKeyDeleteResponse(
-            createOperationKeysErrorOMResponse(omResponse, exception,
-                unDeletedKeys));
+      result = Result.FAILURE;
+      exception = ex;
+      createErrorOMResponse(omResponse, ex);
+
+      // reset deleteKeys as request failed.
+      deleteKeys = new ArrayList<>();
+      // Add all keys which are failed due to any other exception .
+      for (int i = indexFailed; i < length; i++) {
+        unDeletedKeys.addKeys(deleteKeyArgs.getKeys(indexFailed));
       }
 
+      omResponse.setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+          .setStatus(false).setUnDeletedKeys(unDeletedKeys).build()).build();
+      omClientResponse = new OMKeysDeleteResponse(omResponse.build());
+
     } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
     }
 
-    // Performing audit logging outside of the lock.
-    if (result != Result.REPLAY) {
-      auditLog(auditLogger, buildAuditMessage(
-          OMAction.DELETE_KEY, auditMap, exception, userInfo));
-    }
+    auditMap = buildDeleteKeysAuditMap(volumeName, bucketName, deleteKeys,
+        unDeletedKeys.getKeysList());
+
+    auditLog(auditLogger, buildAuditMessage(
+        OMAction.DELETE_KEYS, auditMap, exception, userInfo));
+
 
     switch (result) {
     case SUCCESS:
       omMetrics.decNumKeys();
       LOG.debug("Key deleted. Volume:{}, Bucket:{}, Key:{}", volumeName,
           bucketName, keyName);
       break;
-    case REPLAY:
-      LOG.debug("Replayed Transaction {} ignored. Request: {}",
-          trxnLogIndex, deleteKeyRequest);
-      break;
     case FAILURE:
       omMetrics.incNumKeyDeleteFails();
-      LOG.error("Key delete failed. Volume:{}, Bucket:{}, Key{}." +
-          " Exception:{}", volumeName, bucketName, keyName, exception);
+      LOG.error("Key delete failed. Volume:{}, Bucket:{}, Key:{}",

Review comment:
       Should we reword the error message to differentiate with the single 
delete. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-issues-h...@hadoop.apache.org

Reply via email to