This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 06f1dd0220 IGNITE-22878 Add logging for storage compaction events 
(#4224)
06f1dd0220 is described below

commit 06f1dd02200d120d1edd982f010d487849608c29
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Aug 26 13:01:38 2024 +0300

    IGNITE-22878 Add logging for storage compaction events (#4224)
---
 .../MetaStorageConfigurationSchema.java            |  2 +-
 .../persistence/compaction/Compactor.java          | 16 ++++++
 .../rocksdb/LoggingRocksDbFlushListener.java       | 63 ++++++++++++++++++----
 3 files changed, 70 insertions(+), 11 deletions(-)

diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
index 9cec02766d..dcea1a058c 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/configuration/MetaStorageConfigurationSchema.java
@@ -35,5 +35,5 @@ public class MetaStorageConfigurationSchema {
      */
     @Value(hasDefault = true)
     @Range(min = 1)
-    public long idleSyncTimeInterval = 500;
+    public long idleSyncTimeInterval = 250;
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
index 928e1a92de..b6482aa111 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.io.PageIo;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import 
org.apache.ignite.internal.pagememory.persistence.PartitionProcessingCounterMap;
@@ -65,6 +66,9 @@ import org.jetbrains.annotations.Nullable;
  * </ul>
  */
 public class Compactor extends IgniteWorker {
+    /** Logger. */
+    private static final IgniteLogger LOG = Loggers.forClass(Compactor.class);
+
     private final Object mux = new Object();
 
     private final @Nullable ThreadPoolExecutor threadPoolExecutor;
@@ -216,6 +220,13 @@ public class Compactor extends IgniteWorker {
                 break;
             }
 
+            // TODO Expand the comment. 
https://issues.apache.org/jira/browse/IGNITE-23056
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Starting new compaction round [files={}]", 
queue.size());
+            }
+
+            long start = System.nanoTime();
+
             updateHeartbeat();
 
             int threads = threadPoolExecutor == null ? 1 : 
threadPoolExecutor.getMaximumPoolSize();
@@ -268,6 +279,11 @@ public class Compactor extends IgniteWorker {
 
             // Wait and check for errors.
             CompletableFuture.allOf(futures).join();
+
+            // TODO Expand the comment. 
https://issues.apache.org/jira/browse/IGNITE-23056, handle an exception too.
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Compaction round finished [duration={}ms]", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+            }
         }
     }
 
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
index 5ae0bd6d66..118a3e5f2c 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/LoggingRocksDbFlushListener.java
@@ -17,14 +17,19 @@
 
 package org.apache.ignite.internal.rocksdb;
 
+import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_COMPACTION_BEGIN;
+import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_COMPACTION_COMPLETED;
 import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_BEGIN;
 import static 
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_COMPLETED;
 
+import java.nio.file.Paths;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.rocksdb.AbstractEventListener;
+import org.rocksdb.CompactionJobInfo;
 import org.rocksdb.FlushJobInfo;
 import org.rocksdb.RocksDB;
 
@@ -39,31 +44,39 @@ public class LoggingRocksDbFlushListener extends 
AbstractEventListener {
     private final String name;
 
     /**
-     * Type of last processed event. Real amount of events doesn't matter in 
atomic flush mode. All "completed" events go after all "begin"
-     * events, and vice versa.
+     * Type of last processed flush event. Real amount of events doesn't 
matter in atomic flush mode. All "completed" events go after all
+     * "begin" events, and vice versa.
      */
-    private final AtomicReference<EnabledEventCallback> lastEventType = new 
AtomicReference<>(ON_FLUSH_COMPLETED);
+    private final AtomicReference<EnabledEventCallback> lastFlushEventType = 
new AtomicReference<>(ON_FLUSH_COMPLETED);
+
+    /** Type of last processed compaction event. */
+    private final AtomicReference<EnabledEventCallback> 
lastCompactionEventType = new AtomicReference<>(ON_COMPACTION_COMPLETED);
 
     /** This field is used for determining flush duration. */
     private volatile long lastFlushStartTimeNanos;
 
+    /** This field is used for determining compaction duration. */
+    private volatile long lastCompactionStartTimeNanos;
+
     /**
      * Constructor.
      *
      * @param name Listener name, for logs.
      */
     public LoggingRocksDbFlushListener(String name) {
-        super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED);
+        super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED, ON_COMPACTION_BEGIN, 
ON_COMPACTION_COMPLETED);
 
         this.name = name;
     }
 
     @Override
     public void onFlushBegin(RocksDB db, FlushJobInfo flushJobInfo) {
-        if (lastEventType.compareAndSet(ON_FLUSH_COMPLETED, ON_FLUSH_BEGIN)) {
-            LOG.info("Starting rocksdb flush process [name='{}', reason={}]", 
name, flushJobInfo.getFlushReason());
+        if (lastFlushEventType.compareAndSet(ON_FLUSH_COMPLETED, 
ON_FLUSH_BEGIN)) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Starting rocksdb flush process [name='{}', 
reason={}]", name, flushJobInfo.getFlushReason());
 
-            lastFlushStartTimeNanos = System.nanoTime();
+                lastFlushStartTimeNanos = System.nanoTime();
+            }
 
             onFlushBeginCallback(db, flushJobInfo);
         }
@@ -71,10 +84,12 @@ public class LoggingRocksDbFlushListener extends 
AbstractEventListener {
 
     @Override
     public void onFlushCompleted(RocksDB db, FlushJobInfo flushJobInfo) {
-        if (lastEventType.compareAndSet(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED)) {
-            long duration = System.nanoTime() - lastFlushStartTimeNanos;
+        if (lastFlushEventType.compareAndSet(ON_FLUSH_BEGIN, 
ON_FLUSH_COMPLETED)) {
+            if (LOG.isInfoEnabled()) {
+                long duration = System.nanoTime() - lastFlushStartTimeNanos;
 
-            LOG.info("Finishing rocksdb flush process [name='{}', 
duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration));
+                LOG.info("Finishing rocksdb flush process [name='{}', 
duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration));
+            }
 
             onFlushCompletedCallback(db, flushJobInfo);
         }
@@ -87,4 +102,32 @@ public class LoggingRocksDbFlushListener extends 
AbstractEventListener {
     protected void onFlushCompletedCallback(RocksDB db, FlushJobInfo 
flushJobInfo) {
         // No-op.
     }
+
+    @Override
+    public void onCompactionBegin(RocksDB db, CompactionJobInfo 
compactionJobInfo) {
+        if (lastCompactionEventType.compareAndSet(ON_COMPACTION_COMPLETED, 
ON_COMPACTION_BEGIN)) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Starting rocksdb compaction process [name='{}', 
reason={}, input={}, output={}]",
+                        name,
+                        compactionJobInfo.compactionReason(),
+                        // Extract file names from full paths.
+                        compactionJobInfo.inputFiles().stream().map(path -> 
Paths.get(path).getFileName()).collect(Collectors.toList()),
+                        compactionJobInfo.outputFiles().stream().map(path -> 
Paths.get(path).getFileName()).collect(Collectors.toList())
+                );
+
+                lastCompactionStartTimeNanos = System.nanoTime();
+            }
+        }
+    }
+
+    @Override
+    public void onCompactionCompleted(RocksDB db, CompactionJobInfo 
compactionJobInfo) {
+        if (lastCompactionEventType.compareAndSet(ON_COMPACTION_BEGIN, 
ON_COMPACTION_COMPLETED)) {
+            if (LOG.isInfoEnabled()) {
+                long duration = System.nanoTime() - 
lastCompactionStartTimeNanos;
+
+                LOG.info("Finishing rocksdb compaction process [name='{}', 
duration={}ms]", name, TimeUnit.NANOSECONDS.toMillis(duration));
+            }
+        }
+    }
 }

Reply via email to