This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 06f1dd0220 IGNITE-22878 Add logging for storage compaction events
(#4224)
06f1dd0220 is described below
commit 06f1dd02200d120d1edd982f010d487849608c29
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Aug 26 13:01:38 2024 +0300
IGNITE-22878 Add logging for storage compaction events (#4224)
---
.../MetaStorageConfigurationSchema.java | 2 +-
.../persistence/compaction/Compactor.java | 16 ++++++
.../rocksdb/LoggingRocksDbFlushListener.java | 63 ++++++++++++++++++----
3 files changed, 70 insertions(+), 11 deletions(-)
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
index 9cec02766d..dcea1a058c 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
@@ -35,5 +35,5 @@ public class MetaStorageConfigurationSchema {
*/
@Value(hasDefault = true)
@Range(min = 1)
- public long idleSyncTimeInterval = 500;
+ public long idleSyncTimeInterval = 250;
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index 928e1a92de..b6482aa111 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
@@ -65,6 +66,9 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*/
public class Compactor extends IgniteWorker {
+ /** Logger. */
+ private static final IgniteLogger LOG = Loggers.forClass(Compactor.class);
+
private final Object mux = new Object();
private final @Nullable ThreadPoolExecutor threadPoolExecutor;
@@ -216,6 +220,13 @@ public class Compactor extends IgniteWorker {
break;
}
+ // TODO Expand the comment.
https://issues.apache.org/jira/browse/IGNITE-23056
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting new compaction round [files={}]",
queue.size());
+ }
+
+ long start = System.nanoTime();
+
updateHeartbeat();
int threads = threadPoolExecutor == null ? 1 :
threadPoolExecutor.getMaximumPoolSize();
@@ -268,6 +279,11 @@ public class Compactor extends IgniteWorker {
// Wait and check for errors.
CompletableFuture.allOf(futures).join();
+
+ // TODO Expand the comment.
https://issues.apache.org/jira/browse/IGNITE-23056, handle an exception too.
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Compaction round finished [duration={}ms]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ }
}
}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
index 5ae0bd6d66..118a3e5f2c 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
@@ -17,14 +17,19 @@
package org.apache.ignite.internal.rocksdb;
+import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_COMPACTION_BEGIN;
+import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_COMPACTION_COMPLETED;
import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BEGIN;
import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
+import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.rocksdb.AbstractEventListener;
+import org.rocksdb.CompactionJobInfo;
import org.rocksdb.FlushJobInfo;
import org.rocksdb.RocksDB;
@@ -39,31 +44,39 @@ public class LoggingRocksDbFlushListener extends
AbstractEventListener {
private final String name;
/**
- * Type of last processed event. Real amount of events doesn't matter in
atomic flush mode. All "completed" events go after all "begin"
- * events, and vice versa.
+ * Type of last processed flush event. Real amount of events doesn't
matter in atomic flush mode. All "completed" events go after all
+ * "begin" events, and vice versa.
*/
- private final AtomicReference<EnabledEventCallback> lastEventType = new
AtomicReference<>(ON_FLUSH_COMPLETED);
+ private final AtomicReference<EnabledEventCallback> lastFlushEventType =
new AtomicReference<>(ON_FLUSH_COMPLETED);
+
+ /** Type of last processed compaction event. */
+ private final AtomicReference<EnabledEventCallback>
lastCompactionEventType = new AtomicReference<>(ON_COMPACTION_COMPLETED);
/** This field is used for determining flush duration. */
private volatile long lastFlushStartTimeNanos;
+ /** This field is used for determining compaction duration. */
+ private volatile long lastCompactionStartTimeNanos;
+
/**
* Constructor.
*
* @param name Listener name, for logs.
*/
public LoggingRocksDbFlushListener(String name) {
- super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED);
+ super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED, ON_COMPACTION_BEGIN,
ON_COMPACTION_COMPLETED);
this.name = name;
}
@Override
public void onFlushBegin(RocksDB db, FlushJobInfo flushJobInfo) {
- if (lastEventType.compareAndSet(ON_FLUSH_COMPLETED, ON_FLUSH_BEGIN)) {
- LOG.info("Starting rocksdb flush process [name='{}', reason={}]",
name, flushJobInfo.getFlushReason());
+ if (lastFlushEventType.compareAndSet(ON_FLUSH_COMPLETED,
ON_FLUSH_BEGIN)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting rocksdb flush process [name='{}',
reason={}]", name, flushJobInfo.getFlushReason());
- lastFlushStartTimeNanos = System.nanoTime();
+ lastFlushStartTimeNanos = System.nanoTime();
+ }
onFlushBeginCallback(db, flushJobInfo);
}
@@ -71,10 +84,12 @@ public class LoggingRocksDbFlushListener extends
AbstractEventListener {
@Override
public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
- if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
- long duration = System.nanoTime() - lastFlushStartTimeNanos;
+ if (lastFlushEventType.compareAndSet(ON_FLUSH_BEGIN,
ON_FLUSH_COMPLETED)) {
+ if (LOG.isInfoEnabled()) {
+ long duration = System.nanoTime() - lastFlushStartTimeNanos;
- LOG.info("Finishing rocksdb flush process [name='{}',
duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration));
+ LOG.info("Finishing rocksdb flush process [name='{}',
duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration));
+ }
onFlushCompletedCallback(db, flushJobInfo);
}
@@ -87,4 +102,32 @@ public class LoggingRocksDbFlushListener extends
AbstractEventListener {
protected void onFlushCompletedCallback(RocksDB db, FlushJobInfo
flushJobInfo) {
// No-op.
}
+
+ @Override
+ public void onCompactionBegin(RocksDB db, CompactionJobInfo
compactionJobInfo) {
+ if (lastCompactionEventType.compareAndSet(ON_COMPACTION_COMPLETED,
ON_COMPACTION_BEGIN)) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Starting rocksdb compaction process [name='{}',
reason={}, input={}, output={}]",
+ name,
+ compactionJobInfo.compactionReason(),
+ // Extract file names from full paths.
+ compactionJobInfo.inputFiles().stream().map(path ->
Paths.get(path).getFileName()).collect(Collectors.toList()),
+ compactionJobInfo.outputFiles().stream().map(path ->
Paths.get(path).getFileName()).collect(Collectors.toList())
+ );
+
+ lastCompactionStartTimeNanos = System.nanoTime();
+ }
+ }
+ }
+
+ @Override
+ public void onCompactionCompleted(RocksDB db, CompactionJobInfo
compactionJobInfo) {
+ if (lastCompactionEventType.compareAndSet(ON_COMPACTION_BEGIN,
ON_COMPACTION_COMPLETED)) {
+ if (LOG.isInfoEnabled()) {
+ long duration = System.nanoTime() -
lastCompactionStartTimeNanos;
+
+ LOG.info("Finishing rocksdb compaction process [name='{}',
duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration));
+ }
+ }
+ }
}