This is an automated email from the ASF dual-hosted git repository.
hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b11b80707d HDDS-9039. Removed the pause and wait in RocksDB compaction
when tarball creation is in progress (#6552)
b11b80707d is described below
commit b11b80707d4e6721c06966936ad87063ec107da6
Author: Hemant Kumar <[email protected]>
AuthorDate: Mon May 13 15:00:15 2024 -0700
HDDS-9039. Removed the pause and wait in RocksDB compaction when tarball
creation is in progress (#6552)
---
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 39 --------------------
.../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 28 +--------------
.../hadoop/ozone/om/OMDBCheckpointServlet.java | 41 ++++++----------------
3 files changed, 12 insertions(+), 96 deletions(-)
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index fef6f05ae0..08a013fc7c 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -30,7 +30,6 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
@@ -170,7 +169,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
= new BootstrapStateHandler.Lock();
private ColumnFamilyHandle snapshotInfoTableCFHandle;
- private final AtomicInteger tarballRequestCount;
private static final String DAG_PRUNING_SERVICE_NAME =
"CompactionDagPruningService";
private AtomicBoolean suspended;
@@ -247,7 +245,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
} else {
this.scheduler = null;
}
- this.tarballRequestCount = new AtomicInteger(0);
}
private String createCompactionLogDir(String metadataDirName,
@@ -517,8 +514,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return;
}
- waitForTarballCreation();
-
// Add the compaction log entry to Compaction log table.
addToCompactionLogTable(compactionLogEntry);
@@ -559,22 +554,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
- /**
- * Check if there is any in_progress tarball creation request and wait till
- * all tarball creation finish, and it gets notified.
- */
- private void waitForTarballCreation() {
- while (tarballRequestCount.get() != 0) {
- try {
- wait(Integer.MAX_VALUE);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Compaction log thread {} is interrupted.",
- Thread.currentThread().getName());
- }
- }
- }
-
/**
* Creates a hard link between provided link and source.
* It doesn't throw any exception if {@link Files#createLink} throws
@@ -1424,28 +1403,10 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
- public void incrementTarballRequestCount() {
- tarballRequestCount.incrementAndGet();
- }
-
- public void decrementTarballRequestCountAndNotify() {
- // Synchronized block is used to ensure that lock is on the same instance
notifyAll is being called.
- synchronized (this) {
- tarballRequestCount.decrementAndGet();
- // Notify compaction threads to continue.
- notifyAll();
- }
- }
-
public boolean shouldRun() {
return !suspended.get();
}
- @VisibleForTesting
- public int getTarballRequestCount() {
- return tarballRequestCount.get();
- }
-
@VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index 68ed3536a6..f0f4744e8c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -414,13 +414,8 @@ public class TestOMDbCheckpointServlet {
Path expectedLog = Paths.get(compactionLogDir, "expected" +
COMPACTION_LOG_FILE_NAME_SUFFIX);
String expectedLogStr = truncateFileName(metaDirLength, expectedLog);
- Path unExpectedLog = Paths.get(compactionLogDir, "unexpected" +
- COMPACTION_LOG_FILE_NAME_SUFFIX);
- String unExpectedLogStr = truncateFileName(metaDirLength, unExpectedLog);
Path expectedSst = Paths.get(sstBackupDir, "expected.sst");
String expectedSstStr = truncateFileName(metaDirLength, expectedSst);
- Path unExpectedSst = Paths.get(sstBackupDir, "unexpected.sst");
- String unExpectedSstStr = truncateFileName(metaDirLength, unExpectedSst);
// put "expected" fabricated files onto the fs before the files get
// copied to the temp dir.
@@ -436,15 +431,6 @@ public class TestOMDbCheckpointServlet {
// with the snapshot data.
doNothing().when(checkpoint).cleanupCheckpoint();
realCheckpoint.set(checkpoint);
-
- // put "unexpected" fabricated files onto the fs after the files
- // get copied to the temp dir. Since these appear in the "real"
- // dir after the copy, they shouldn't exist in the final file
- // set. That will show that the copy only happened from the temp dir.
- Files.write(unExpectedLog,
- "fabricatedData".getBytes(StandardCharsets.UTF_8));
- Files.write(unExpectedSst,
- "fabricatedData".getBytes(StandardCharsets.UTF_8));
return checkpoint;
});
@@ -460,10 +446,6 @@ public class TestOMDbCheckpointServlet {
long tmpHardLinkFileCount = tmpHardLinkFileCount();
omDbCheckpointServletMock.doGet(requestMock, responseMock);
assertEquals(tmpHardLinkFileCount, tmpHardLinkFileCount());
-
- // Verify that tarball request count reaches to zero once doGet completes.
- assertEquals(0,
- dbStore.getRocksDBCheckpointDiffer().getTarballRequestCount());
dbCheckpoint = realCheckpoint.get();
// Untar the file into a temp folder to be examined.
@@ -528,15 +510,7 @@ public class TestOMDbCheckpointServlet {
getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR),
metaDirLength);
assertThat(finalFullSet).contains(expectedLogStr);
assertThat(finalFullSet).contains(expectedSstStr);
- assertThat(initialFullSet).contains(unExpectedLogStr);
- assertThat(initialFullSet).contains(unExpectedSstStr);
-
- // Remove the dummy files that should not have been copied over
- // from the expected data.
- initialFullSet.remove(unExpectedLogStr);
- initialFullSet.remove(unExpectedSstStr);
- assertEquals(initialFullSet, finalFullSet,
- "expected snapshot files not found");
+ assertEquals(initialFullSet, finalFullSet, "expected snapshot files not
found");
}
private static long tmpHardLinkFileCount() throws IOException {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 6d2832c913..c8237b7967 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -222,43 +222,24 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
}
/**
- * Pauses rocksdb compaction threads while creating copies of
- * compaction logs and hard links of sst backups.
+ * Copies compaction logs and hard links of sst backups to tmpDir.
* @param tmpdir - Place to create copies/links
* @param flush - Whether to flush the db or not.
* @return Checkpoint containing snapshot entries expected.
*/
@Override
- public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush)
- throws IOException {
- DBCheckpoint checkpoint;
-
+ public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush) throws
IOException {
// make tmp directories to contain the copies
RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
- DirectoryData sstBackupDir = new DirectoryData(tmpdir,
- differ.getSSTBackupDir());
- DirectoryData compactionLogDir = new DirectoryData(tmpdir,
- differ.getCompactionLogDir());
-
- long startTime = System.currentTimeMillis();
- long pauseCounter = PAUSE_COUNTER.incrementAndGet();
-
- try {
- LOG.info("Compaction pausing {} started.", pauseCounter);
- // Pause compactions, Copy/link files and get checkpoint.
- differ.incrementTarballRequestCount();
- FileUtils.copyDirectory(compactionLogDir.getOriginalDir(),
- compactionLogDir.getTmpDir());
- OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(),
- sstBackupDir.getTmpDir());
- checkpoint = getDbStore().getCheckpoint(flush);
- } finally {
- // Unpause the compaction threads.
- differ.decrementTarballRequestCountAndNotify();
- long elapsedTime = System.currentTimeMillis() - startTime;
- LOG.info("Compaction pausing {} ended. Elapsed ms: {}", pauseCounter,
elapsedTime);
- }
- return checkpoint;
+ DirectoryData sstBackupDir = new DirectoryData(tmpdir,
differ.getSSTBackupDir());
+ DirectoryData compactionLogDir = new DirectoryData(tmpdir,
differ.getCompactionLogDir());
+
+ // Create checkpoint and then copy the files so that it has all the
compaction entries and files.
+ DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush);
+ FileUtils.copyDirectory(compactionLogDir.getOriginalDir(),
compactionLogDir.getTmpDir());
+ OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(),
sstBackupDir.getTmpDir());
+
+ return dbCheckpoint;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]