adoroszlai commented on a change in pull request #1169:
URL: https://github.com/apache/hadoop-ozone/pull/1169#discussion_r452974904



##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
##########
@@ -2220,16 +2224,13 @@ public void deleteKey(OmKeyArgs args) throws 
IOException {
   /**
    * Deletes an existing key.
    *
-   * @param args - List attributes of the key.
+   * @param deleteKeys - List of keys to be deleted from volume and a bucket.
    * @throws IOException
    */
   @Override
-  public void deleteKeys(List<OmKeyArgs> args) throws IOException {
-    if (args != null) {
-      for (OmKeyArgs keyArgs : args) {
-        deleteKey(keyArgs);
-      }
-    }
+  public void deleteKeys(OmDeleteKeys deleteKeys) throws IOException {
+    throw new NotImplementedException("OzoneManager does not require this to " 
+

Review comment:
       Please consider changing to `UnsupportedOperationException` as
   > 
[`NotImplementedException`](https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/NotImplementedException.html)
 represents the case where the author has yet to implement the logic at this 
point in the program

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
##########
@@ -116,93 +98,112 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    try {
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
-            keyName);
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
-        omKeyInfoList.add(omKeyInfo);
-        unDeletedKeys.add(omKeyInfo);
-      }
 
-      // Check if any of the key in the batch cannot be deleted. If exists the
-      // batch will delete failed.
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
-        // check Acl
-        checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-            IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
-        String objectKey = omMetadataManager.getOzoneKey(
-            volumeName, bucketName, keyName);
+    boolean acquiredLock =
+        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
 
-        // Validate bucket and volume exists or not.
-        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+    int indexFailed = 0;
+    int length = deleteKeys.size();
+    OzoneManagerProtocolProtos.DeleteKeyArgs.Builder unDeletedKeys =
+        OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName);
+
+    boolean deleteStatus = true;
+    try {
 
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      for (indexFailed = 0; indexFailed < length; indexFailed++) {
+        keyName = deleteKeyArgs.getKeys(indexFailed);
+        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+            keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
 
         if (omKeyInfo == null) {
-          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
+          deleteStatus = false;
+          LOG.error("Received a request to delete a Key does not exist {}",
+              objectKey);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
+          continue;
         }
 
-        // Check if this transaction is a replay of ratis logs.
-        if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
-          // Replay implies the response has already been returned to
-          // the client. So take no further action and return a dummy
-          // OMClientResponse.
-          throw new OMReplayException();
+        try {
+          // check Acl
+          checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+          omKeyInfoList.add(omKeyInfo);
+        } catch (Exception ex) {
+          deleteStatus = false;
+          LOG.error("Acl check failed for Key: {}", objectKey, ex);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
         }
       }
 
+      // Mark all keys which can be deleted, in cache as deleted.
+      for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+                omKeyInfo.getKeyName())),
+            new CacheValue<>(Optional.absent(), trxnLogIndex));
+      }
+
       omClientResponse = new OMKeysDeleteResponse(omResponse
-          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
-          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+              .setStatus(deleteStatus).setUnDeletedKeys(unDeletedKeys))
+          .setStatus(deleteStatus ? OK : PARTIAL_DELETE)
+          .setSuccess(deleteStatus).build(),
+          omKeyInfoList, trxnLogIndex,
+          ozoneManager.isRatisEnabled());
+
       result = Result.SUCCESS;
+
     } catch (IOException ex) {
-      if (ex instanceof OMReplayException) {
-        result = Result.REPLAY;
-        omClientResponse = new OMKeyDeleteResponse(createReplayOMResponse(
-            omResponse));
-      } else {
-        result = Result.FAILURE;
-        exception = ex;
-
-        omClientResponse = new OMKeyDeleteResponse(
-            createOperationKeysErrorOMResponse(omResponse, exception,
-                unDeletedKeys));
+      result = Result.FAILURE;
+      exception = ex;
+      createErrorOMResponse(omResponse, ex);
+
+      // reset deleteKeys as request failed.
+      deleteKeys = new ArrayList<>();
+      // Add all keys which are failed due to any other exception .
+      for (int i = indexFailed; i < length; i++) {
+        unDeletedKeys.addKeys(deleteKeyArgs.getKeys(indexFailed));
       }
 
+      omResponse.setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+          .setStatus(false).setUnDeletedKeys(unDeletedKeys).build()).build();
+      omClientResponse = new OMKeysDeleteResponse(omResponse.build());
+
     } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
     }
 
-    // Performing audit logging outside of the lock.
-    if (result != Result.REPLAY) {
-      auditLog(auditLogger, buildAuditMessage(
-          OMAction.DELETE_KEY, auditMap, exception, userInfo));
-    }
+    auditMap = buildDeleteKeysAuditMap(volumeName, bucketName, deleteKeys,
+        unDeletedKeys.getKeysList());
+
+    auditLog(auditLogger, buildAuditMessage(
+        OMAction.DELETE_KEYS, auditMap, exception, userInfo));
+
 
     switch (result) {
     case SUCCESS:
       omMetrics.decNumKeys();
       LOG.debug("Key deleted. Volume:{}, Bucket:{}, Key:{}", volumeName,
           bucketName, keyName);

Review comment:
       Might want to log all keys:
   
   ```suggestion
         LOG.debug("Key deleted. Volume:{}, Bucket:{}, Key:{}", volumeName,
             bucketName, deleteKeys);
   ```

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
##########
@@ -116,93 +98,112 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    try {
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
-            keyName);
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
-        omKeyInfoList.add(omKeyInfo);
-        unDeletedKeys.add(omKeyInfo);
-      }
 
-      // Check if any of the key in the batch cannot be deleted. If exists the
-      // batch will delete failed.
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
-        // check Acl
-        checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-            IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
-        String objectKey = omMetadataManager.getOzoneKey(
-            volumeName, bucketName, keyName);
+    boolean acquiredLock =
+        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
 
-        // Validate bucket and volume exists or not.
-        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+    int indexFailed = 0;
+    int length = deleteKeys.size();
+    OzoneManagerProtocolProtos.DeleteKeyArgs.Builder unDeletedKeys =
+        OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName);
+
+    boolean deleteStatus = true;
+    try {
 
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      for (indexFailed = 0; indexFailed < length; indexFailed++) {
+        keyName = deleteKeyArgs.getKeys(indexFailed);
+        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+            keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
 
         if (omKeyInfo == null) {
-          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
+          deleteStatus = false;
+          LOG.error("Received a request to delete a Key does not exist {}",
+              objectKey);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
+          continue;
         }
 
-        // Check if this transaction is a replay of ratis logs.
-        if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
-          // Replay implies the response has already been returned to
-          // the client. So take no further action and return a dummy
-          // OMClientResponse.
-          throw new OMReplayException();
+        try {
+          // check Acl
+          checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+          omKeyInfoList.add(omKeyInfo);
+        } catch (Exception ex) {
+          deleteStatus = false;
+          LOG.error("Acl check failed for Key: {}", objectKey, ex);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
         }
       }
 
+      // Mark all keys which can be deleted, in cache as deleted.
+      for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+                omKeyInfo.getKeyName())),
+            new CacheValue<>(Optional.absent(), trxnLogIndex));
+      }
+
       omClientResponse = new OMKeysDeleteResponse(omResponse
-          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
-          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+              .setStatus(deleteStatus).setUnDeletedKeys(unDeletedKeys))
+          .setStatus(deleteStatus ? OK : PARTIAL_DELETE)
+          .setSuccess(deleteStatus).build(),
+          omKeyInfoList, trxnLogIndex,
+          ozoneManager.isRatisEnabled());
+
       result = Result.SUCCESS;
+
     } catch (IOException ex) {
-      if (ex instanceof OMReplayException) {
-        result = Result.REPLAY;
-        omClientResponse = new OMKeyDeleteResponse(createReplayOMResponse(
-            omResponse));
-      } else {
-        result = Result.FAILURE;
-        exception = ex;
-
-        omClientResponse = new OMKeyDeleteResponse(
-            createOperationKeysErrorOMResponse(omResponse, exception,
-                unDeletedKeys));
+      result = Result.FAILURE;
+      exception = ex;
+      createErrorOMResponse(omResponse, ex);
+
+      // reset deleteKeys as request failed.
+      deleteKeys = new ArrayList<>();
+      // Add all keys which are failed due to any other exception .
+      for (int i = indexFailed; i < length; i++) {
+        unDeletedKeys.addKeys(deleteKeyArgs.getKeys(indexFailed));
       }
 
+      omResponse.setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+          .setStatus(false).setUnDeletedKeys(unDeletedKeys).build()).build();
+      omClientResponse = new OMKeysDeleteResponse(omResponse.build());
+
     } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
       addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
           omDoubleBufferHelper);
     }
 
-    // Performing audit logging outside of the lock.
-    if (result != Result.REPLAY) {
-      auditLog(auditLogger, buildAuditMessage(
-          OMAction.DELETE_KEY, auditMap, exception, userInfo));
-    }
+    auditMap = buildDeleteKeysAuditMap(volumeName, bucketName, deleteKeys,
+        unDeletedKeys.getKeysList());
+
+    auditLog(auditLogger, buildAuditMessage(
+        OMAction.DELETE_KEYS, auditMap, exception, userInfo));
+
 
     switch (result) {
     case SUCCESS:
       omMetrics.decNumKeys();

Review comment:
       Should decrease number of keys by `deleteKeys.size()`.

##########
File path: 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java
##########
@@ -116,93 +98,112 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager,
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
-    try {
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
-            keyName);
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
-        omKeyInfoList.add(omKeyInfo);
-        unDeletedKeys.add(omKeyInfo);
-      }
 
-      // Check if any of the key in the batch cannot be deleted. If exists the
-      // batch will delete failed.
-      for (KeyArgs deleteKeyArgs : deleteKeyArgsList) {
-        volumeName = deleteKeyArgs.getVolumeName();
-        bucketName = deleteKeyArgs.getBucketName();
-        keyName = deleteKeyArgs.getKeyName();
-        auditMap = buildKeyArgsAuditMap(deleteKeyArgs);
-        // check Acl
-        checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
-            IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
 
-        String objectKey = omMetadataManager.getOzoneKey(
-            volumeName, bucketName, keyName);
+    boolean acquiredLock =
+        omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
+            bucketName);
 
-        // Validate bucket and volume exists or not.
-        validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+    int indexFailed = 0;
+    int length = deleteKeys.size();
+    OzoneManagerProtocolProtos.DeleteKeyArgs.Builder unDeletedKeys =
+        OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder()
+            .setVolumeName(volumeName).setBucketName(bucketName);
+
+    boolean deleteStatus = true;
+    try {
 
+      // Validate bucket and volume exists or not.
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      for (indexFailed = 0; indexFailed < length; indexFailed++) {
+        keyName = deleteKeyArgs.getKeys(indexFailed);
+        String objectKey = omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+            keyName);
         OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(objectKey);
 
         if (omKeyInfo == null) {
-          throw new OMException("Key not found: " + keyName, KEY_NOT_FOUND);
+          deleteStatus = false;
+          LOG.error("Received a request to delete a Key does not exist {}",
+              objectKey);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
+          continue;
         }
 
-        // Check if this transaction is a replay of ratis logs.
-        if (isReplay(ozoneManager, omKeyInfo, trxnLogIndex)) {
-          // Replay implies the response has already been returned to
-          // the client. So take no further action and return a dummy
-          // OMClientResponse.
-          throw new OMReplayException();
+        try {
+          // check Acl
+          checkKeyAcls(ozoneManager, volumeName, bucketName, keyName,
+              IAccessAuthorizer.ACLType.DELETE, OzoneObj.ResourceType.KEY);
+          omKeyInfoList.add(omKeyInfo);
+        } catch (Exception ex) {
+          deleteStatus = false;
+          LOG.error("Acl check failed for Key: {}", objectKey, ex);
+          deleteKeys.remove(keyName);
+          unDeletedKeys.addKeys(keyName);
         }
       }
 
+      // Mark all keys which can be deleted, in cache as deleted.
+      for (OmKeyInfo omKeyInfo : omKeyInfoList) {
+        omMetadataManager.getKeyTable().addCacheEntry(
+            new CacheKey<>(omMetadataManager.getOzoneKey(volumeName, 
bucketName,
+                omKeyInfo.getKeyName())),
+            new CacheValue<>(Optional.absent(), trxnLogIndex));
+      }
+
       omClientResponse = new OMKeysDeleteResponse(omResponse
-          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()).build(),
-          omKeyInfoList, trxnLogIndex, ozoneManager.isRatisEnabled());
+          .setDeleteKeysResponse(DeleteKeysResponse.newBuilder()
+              .setStatus(deleteStatus).setUnDeletedKeys(unDeletedKeys))
+          .setStatus(deleteStatus ? OK : PARTIAL_DELETE)
+          .setSuccess(deleteStatus).build(),
+          omKeyInfoList, trxnLogIndex,
+          ozoneManager.isRatisEnabled());
+
       result = Result.SUCCESS;
+
     } catch (IOException ex) {
-      if (ex instanceof OMReplayException) {
-        result = Result.REPLAY;
-        omClientResponse = new OMKeyDeleteResponse(createReplayOMResponse(
-            omResponse));
-      } else {
-        result = Result.FAILURE;
-        exception = ex;
-
-        omClientResponse = new OMKeyDeleteResponse(
-            createOperationKeysErrorOMResponse(omResponse, exception,
-                unDeletedKeys));
+      result = Result.FAILURE;
+      exception = ex;
+      createErrorOMResponse(omResponse, ex);
+
+      // reset deleteKeys as request failed.
+      deleteKeys = new ArrayList<>();
+      // Add all keys which are failed due to any other exception .
+      for (int i = indexFailed; i < length; i++) {
+        unDeletedKeys.addKeys(deleteKeyArgs.getKeys(indexFailed));

Review comment:
       Should not repeatedly add the same key:
   
   ```suggestion
         for (int i = indexFailed; i < length; i++) {
           unDeletedKeys.addKeys(deleteKeyArgs.getKeys(i));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-issues-h...@hadoop.apache.org

Reply via email to