smengcl commented on code in PR #4486:
URL: https://github.com/apache/ozone/pull/4486#discussion_r1151182240


##########
hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java:
##########
@@ -544,4 +544,10 @@ String getMultipartKey(long volumeId, long bucketId,
    */
   long getBucketId(String volume, String bucket) throws IOException;
 
+  /**
+   * Returns List<{@link BlockGroup}> for a key in the deletedTable.
+   * @param deletedKey - key to be purged from the deletedTable
+   * @return {@link BlockGroup}
+   */
+  List<BlockGroup> getBlocksForKeyDelete(String deletedKey) throws IOException;

Review Comment:
   Do we need another param to limit the return list size?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java:
##########
@@ -64,10 +74,27 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager,
       }
     }
 
-    omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
-          keysToBePurgedList);
-    addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
-        omDoubleBufferHelper);
+    try {
+      OmSnapshot omFromSnapshot = null;
+      if (fromSnapshot != null) {
+        SnapshotInfo snapshotInfo =
+            ozoneManager.getMetadataManager().getSnapshotInfoTable()
+                .get(fromSnapshot);
+        omFromSnapshot = (OmSnapshot) omSnapshotManager
+            .checkForSnapshot(snapshotInfo.getVolumeName(),
+                snapshotInfo.getBucketName(),
+                getSnapshotPrefix(snapshotInfo.getName()));
+      }
+

Review Comment:
   Shall we move this "check and get" to `OMKeyPurgeResponse` instead?
   
   i.e. What could go wrong if we just pass `fromSnapshot` to 
`OMKeyPurgeResponse` constructor (rather than `omFromSnapshot`)?
   
   Here in `validateAndUpdateCache` we could just sanity check the snapshot 
existence.



##########
hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto:
##########
@@ -1200,6 +1200,7 @@ message DeletedKeys {
 
 message PurgeKeysRequest {
     repeated DeletedKeys deletedKeys = 1;
+    optional string fromSnapshot = 2;

Review Comment:
   `fromSnapshot` sounds too ambiguous to me. First I thought it is related to 
SnapDiff.
   
   ```suggestion
       // if not null, will purge keys in a snapshot DB instead of active
       optional string snapshotTableKey = 2;
   ```
   
   And definitely need comments on the behavior where `snapshotTableKey` is 
null or not.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java:
##########
@@ -82,6 +83,7 @@ public OmSnapshot(KeyManager keyManager,
     this.bucketName = bucketName;
     this.volumeName = volumeName;
     this.omMetadataManager = keyManager.getMetadataManager();
+    this.keyManager = keyManager;

Review Comment:
   nit: order
   ```suggestion
       this.keyManager = keyManager;
       this.omMetadataManager = keyManager.getMetadataManager();
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java:
##########
@@ -64,10 +74,27 @@ public OMClientResponse validateAndUpdateCache(OzoneManager 
ozoneManager,
       }
     }
 
-    omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
-          keysToBePurgedList);
-    addResponseToDoubleBuffer(trxnLogIndex, omClientResponse,
-        omDoubleBufferHelper);
+    try {
+      OmSnapshot omFromSnapshot = null;
+      if (fromSnapshot != null) {
+        SnapshotInfo snapshotInfo =
+            ozoneManager.getMetadataManager().getSnapshotInfoTable()
+                .get(fromSnapshot);
+        omFromSnapshot = (OmSnapshot) omSnapshotManager
+            .checkForSnapshot(snapshotInfo.getVolumeName(),
+                snapshotInfo.getBucketName(),
+                getSnapshotPrefix(snapshotInfo.getName()));
+      }
+
+      omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
+          keysToBePurgedList, omFromSnapshot);
+    }  catch (IOException ex) {

Review Comment:
   ```suggestion
       } catch (IOException ex) {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java:
##########
@@ -36,19 +38,44 @@
 @CleanupTableInfo(cleanupTables = {DELETED_TABLE})
 public class OMKeyPurgeResponse extends OmKeyResponse {
   private List<String> purgeKeyList;
+  private OmSnapshot fromSnapshot;
 
   public OMKeyPurgeResponse(@Nonnull OMResponse omResponse,
-      @Nonnull List<String> keyList) {
+      @Nonnull List<String> keyList, OmSnapshot fromSnapshot) {
     super(omResponse);
     this.purgeKeyList = keyList;
+    this.fromSnapshot = fromSnapshot;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMKeyPurgeResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
   }
 
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
+    if (fromSnapshot != null) {
+      DBStore fromSnapshotStore = fromSnapshot.getMetadataManager().getStore();
+      // Init Batch Operation for snapshot db.
+      try (BatchOperation writeBatch = fromSnapshotStore.initBatchOperation()) 
{
+        processKeys(writeBatch, fromSnapshot.getMetadataManager());
+        fromSnapshotStore.commitBatchOperation(writeBatch);
+      }
+    } else {
+      processKeys(batchOperation, omMetadataManager);
+    }
+  }
+
+  public void processKeys(BatchOperation batchOp,

Review Comment:
   ```suggestion
     private void processKeys(BatchOperation batchOp,
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java:
##########
@@ -344,7 +344,7 @@ protected OmMetadataManagerImpl() {
       File checkpoint = Paths.get(metaDir.toPath().toString(), 
dbName).toFile();
       RDBCheckpointManager.waitForCheckpointDirectoryExist(checkpoint);
     }
-    setStore(loadDB(conf, metaDir, dbName, true,
+    setStore(loadDB(conf, metaDir, dbName, false,
             java.util.Optional.of(Boolean.TRUE)));

Review Comment:
   This should be fine. Since we already disabled auto compaction on CFs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to