This is an automated email from the ASF dual-hosted git repository.

hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b11b80707d HDDS-9039. Removed the pause and wait in RocksDB compaction 
when tarball creation is in progress (#6552)
b11b80707d is described below

commit b11b80707d4e6721c06966936ad87063ec107da6
Author: Hemant Kumar <[email protected]>
AuthorDate: Mon May 13 15:00:15 2024 -0700

    HDDS-9039. Removed the pause and wait in RocksDB compaction when tarball 
creation is in progress (#6552)
---
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 39 --------------------
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 28 +--------------
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     | 41 ++++++----------------
 3 files changed, 12 insertions(+), 96 deletions(-)

diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index fef6f05ae0..08a013fc7c 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -30,7 +30,6 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.collections.CollectionUtils;
@@ -170,7 +169,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
       = new BootstrapStateHandler.Lock();
 
   private ColumnFamilyHandle snapshotInfoTableCFHandle;
-  private final AtomicInteger tarballRequestCount;
   private static final String DAG_PRUNING_SERVICE_NAME = 
"CompactionDagPruningService";
   private AtomicBoolean suspended;
 
@@ -247,7 +245,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     } else {
       this.scheduler = null;
     }
-    this.tarballRequestCount = new AtomicInteger(0);
   }
 
   private String createCompactionLogDir(String metadataDirName,
@@ -517,8 +514,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
             return;
           }
 
-          waitForTarballCreation();
-
           // Add the compaction log entry to Compaction log table.
           addToCompactionLogTable(compactionLogEntry);
 
@@ -559,22 +554,6 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     }
   }
 
-  /**
-   * Check if there is any in_progress tarball creation request and wait till
-   * all tarball creation finish, and it gets notified.
-   */
-  private void waitForTarballCreation() {
-    while (tarballRequestCount.get() != 0) {
-      try {
-        wait(Integer.MAX_VALUE);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.error("Compaction log thread {} is interrupted.",
-            Thread.currentThread().getName());
-      }
-    }
-  }
-
   /**
    * Creates a hard link between provided link and source.
    * It doesn't throw any exception if {@link Files#createLink} throws
@@ -1424,28 +1403,10 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
     }
   }
 
-  public void incrementTarballRequestCount() {
-    tarballRequestCount.incrementAndGet();
-  }
-
-  public void decrementTarballRequestCountAndNotify() {
-    // Synchronized block is used to ensure that lock is on the same instance 
notifyAll is being called.
-    synchronized (this) {
-      tarballRequestCount.decrementAndGet();
-      // Notify compaction threads to continue.
-      notifyAll();
-    }
-  }
-
   public boolean shouldRun() {
     return !suspended.get();
   }
 
-  @VisibleForTesting
-  public int getTarballRequestCount() {
-    return tarballRequestCount.get();
-  }
-
   @VisibleForTesting
   public boolean debugEnabled(Integer level) {
     return DEBUG_LEVEL.contains(level);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index 68ed3536a6..f0f4744e8c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -414,13 +414,8 @@ public class TestOMDbCheckpointServlet {
     Path expectedLog = Paths.get(compactionLogDir, "expected" +
         COMPACTION_LOG_FILE_NAME_SUFFIX);
     String expectedLogStr = truncateFileName(metaDirLength, expectedLog);
-    Path unExpectedLog = Paths.get(compactionLogDir, "unexpected" +
-        COMPACTION_LOG_FILE_NAME_SUFFIX);
-    String unExpectedLogStr = truncateFileName(metaDirLength, unExpectedLog);
     Path expectedSst = Paths.get(sstBackupDir, "expected.sst");
     String expectedSstStr = truncateFileName(metaDirLength, expectedSst);
-    Path unExpectedSst = Paths.get(sstBackupDir, "unexpected.sst");
-    String unExpectedSstStr = truncateFileName(metaDirLength, unExpectedSst);
 
     // put "expected" fabricated files onto the fs before the files get
     //  copied to the temp dir.
@@ -436,15 +431,6 @@ public class TestOMDbCheckpointServlet {
       // with the snapshot data.
       doNothing().when(checkpoint).cleanupCheckpoint();
       realCheckpoint.set(checkpoint);
-
-      // put "unexpected" fabricated files onto the fs after the files
-      // get copied to the temp dir.  Since these appear in the "real"
-      // dir after the copy, they shouldn't exist in the final file
-      // set.  That will show that the copy only happened from the temp dir.
-      Files.write(unExpectedLog,
-          "fabricatedData".getBytes(StandardCharsets.UTF_8));
-      Files.write(unExpectedSst,
-          "fabricatedData".getBytes(StandardCharsets.UTF_8));
       return checkpoint;
     });
 
@@ -460,10 +446,6 @@ public class TestOMDbCheckpointServlet {
     long tmpHardLinkFileCount = tmpHardLinkFileCount();
     omDbCheckpointServletMock.doGet(requestMock, responseMock);
     assertEquals(tmpHardLinkFileCount, tmpHardLinkFileCount());
-
-    // Verify that tarball request count reaches to zero once doGet completes.
-    assertEquals(0,
-        dbStore.getRocksDBCheckpointDiffer().getTarballRequestCount());
     dbCheckpoint = realCheckpoint.get();
 
     // Untar the file into a temp folder to be examined.
@@ -528,15 +510,7 @@ public class TestOMDbCheckpointServlet {
         getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR), 
metaDirLength);
     assertThat(finalFullSet).contains(expectedLogStr);
     assertThat(finalFullSet).contains(expectedSstStr);
-    assertThat(initialFullSet).contains(unExpectedLogStr);
-    assertThat(initialFullSet).contains(unExpectedSstStr);
-
-    // Remove the dummy files that should not have been copied over
-    // from the expected data.
-    initialFullSet.remove(unExpectedLogStr);
-    initialFullSet.remove(unExpectedSstStr);
-    assertEquals(initialFullSet, finalFullSet,
-        "expected snapshot files not found");
+    assertEquals(initialFullSet, finalFullSet, "expected snapshot files not 
found");
   }
 
   private static long tmpHardLinkFileCount() throws IOException {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 6d2832c913..c8237b7967 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -222,43 +222,24 @@ public class OMDBCheckpointServlet extends 
DBCheckpointServlet {
   }
 
   /**
-   * Pauses rocksdb compaction threads while creating copies of
-   * compaction logs and hard links of sst backups.
+   * Copies compaction logs and hard links of sst backups to tmpDir.
    * @param  tmpdir - Place to create copies/links
    * @param  flush -  Whether to flush the db or not.
    * @return Checkpoint containing snapshot entries expected.
    */
   @Override
-  public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush)
-      throws IOException {
-    DBCheckpoint checkpoint;
-
+  public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) throws 
IOException {
     // make tmp directories to contain the copies
     RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
-    DirectoryData sstBackupDir = new DirectoryData(tmpdir,
-        differ.getSSTBackupDir());
-    DirectoryData compactionLogDir = new DirectoryData(tmpdir,
-        differ.getCompactionLogDir());
-
-    long startTime = System.currentTimeMillis();
-    long pauseCounter = PAUSE_COUNTER.incrementAndGet();
-
-    try {
-      LOG.info("Compaction pausing {} started.", pauseCounter);
-      // Pause compactions, Copy/link files and get checkpoint.
-      differ.incrementTarballRequestCount();
-      FileUtils.copyDirectory(compactionLogDir.getOriginalDir(),
-          compactionLogDir.getTmpDir());
-      OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(),
-          sstBackupDir.getTmpDir());
-      checkpoint = getDbStore().getCheckpoint(flush);
-    } finally {
-      // Unpause the compaction threads.
-      differ.decrementTarballRequestCountAndNotify();
-      long elapsedTime = System.currentTimeMillis() - startTime;
-      LOG.info("Compaction pausing {} ended. Elapsed ms: {}", pauseCounter, 
elapsedTime);
-    }
-    return checkpoint;
+    DirectoryData sstBackupDir = new DirectoryData(tmpdir, 
differ.getSSTBackupDir());
+    DirectoryData compactionLogDir = new DirectoryData(tmpdir, 
differ.getCompactionLogDir());
+
+    // Create checkpoint and then copy the files so that it has all the 
compaction entries and files.
+    DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
+    FileUtils.copyDirectory(compactionLogDir.getOriginalDir(), 
compactionLogDir.getTmpDir());
+    OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(), 
sstBackupDir.getTmpDir());
+
+    return dbCheckpoint;
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to