This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 46a4462d45 IGNITE-23549 Flush Metastorage underlying storage to disk 
after taking snapshot (#4679)
46a4462d45 is described below

commit 46a4462d45175f65d299b60f9662c64bb93ad507
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 6 11:36:55 2024 +0300

    IGNITE-23549 Flush Metastorage underlying storage to disk after taking 
snapshot (#4679)
---
 .../ignite/internal/components/LogSyncer.java      |  1 +
 .../{LogSyncer.java => NoOpLogSyncer.java}         | 14 ++--
 .../metastorage/server/KeyValueStorage.java        |  7 ++
 .../server/persistence/RocksDbKeyValueStorage.java | 74 ++++++++++++++++++++--
 .../server/RocksDbKeyValueStorageTest.java         | 15 +++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  5 ++
 .../replicator/ItReplicaLifecycleTest.java         |  7 +-
 .../internal/rocksdb/flush/RocksDbFlusher.java     | 16 ++++-
 .../runner/app/ItIgniteNodeRestartTest.java        |  7 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java | 16 ++---
 .../rebalance/ItRebalanceDistributedTest.java      |  7 +-
 11 files changed, 127 insertions(+), 42 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
index 612e6244b2..98f450e689 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.components;
 
 /** Interface to synchronize write-ahead log. Operates only for persistent log 
storages. */
+@FunctionalInterface
 public interface LogSyncer {
     /**
      * Synchronizes write-ahead log.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/components/NoOpLogSyncer.java
similarity index 75%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/components/NoOpLogSyncer.java
index 612e6244b2..617450c3aa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/components/NoOpLogSyncer.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.components;
 
-/** Interface to synchronize write-ahead log. Operates only for persistent log 
storages. */
-public interface LogSyncer {
-    /**
-     * Synchronizes write-ahead log.
-     *
-     * @throws Exception if an error occurs whilst syncing.
-     */
-    void sync() throws Exception;
+/** Implementation that does nothing. */
+public class NoOpLogSyncer implements LogSyncer {
+    @Override
+    public void sync() {
+        // No-op.
+    }
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ec580af62d..3ff427de74 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -589,4 +589,11 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      * @see #getCompactionRevision()
      */
     Revisions revisions();
+
+    /**
+     * Flushes current state of the data or <i>the state from the nearest 
future</i> to the storage.
+     *
+     * @return Future that's completed when flushing of the data is completed.
+     */
+    CompletableFuture<Void> flush();
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d6a30e2dd8..810e8c5401 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -46,6 +46,8 @@ import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWrit
 import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.ErrorGroups.MetaStorage.COMPACTION_ERR;
 import static org.apache.ignite.lang.ErrorGroups.MetaStorage.OP_EXECUTION_ERR;
@@ -68,8 +70,11 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.components.NoOpLogSyncer;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.CommandId;
@@ -99,11 +104,13 @@ import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
 import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -174,12 +181,18 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     /** Batch size (number of keys) for storage compaction. The value is 
arbitrary. */
     private static final int COMPACT_BATCH_SIZE = 10;
 
+    /** Key value storage flush delay in mills. Value is taken from the 
example of default values of other components. */
+    private static final int KV_STORAGE_FLUSH_DELAY = 100;
+
     static {
         RocksDB.loadLibrary();
     }
 
-    /** Thread-pool for snapshot operations execution. */
-    private final ExecutorService snapshotExecutor;
+    /** Executor for storage operations. */
+    private final ExecutorService executor;
+
+    /** Scheduled executor for storage operations. */
+    private final ScheduledExecutorService scheduledExecutor;
 
     /** Path to the rocksdb database. */
     private final Path dbPath;
@@ -258,6 +271,13 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
      */
     private WriteOptions checksumWriteOptions;
 
+    /** Multi-threaded access is guarded by {@link #rwLock}. */
+    private RocksDbFlusher flusher;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
     /**
      * Constructor.
      *
@@ -280,11 +300,23 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         this.dbPath = dbPath;
 
-        snapshotExecutor = Executors.newFixedThreadPool(2, 
NamedThreadFactory.create(nodeName, "metastorage-snapshot-executor", log));
+        executor = Executors.newFixedThreadPool(
+                2,
+                NamedThreadFactory.create(nodeName, 
"metastorage-rocksdb-kv-storage-executor", log)
+        );
+
+        // TODO: IGNITE-23615 Use a common pool, e.g. 
ThreadPoolsManager#commonScheduler
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+                NamedThreadFactory.create(nodeName, 
"metastorage-rocksdb-kv-storage-scheduler", log)
+        );
     }
 
     @Override
     public void start() {
+        inBusyLock(busyLock, this::startBusy);
+    }
+
+    private void startBusy() {
         rwLock.writeLock().lock();
 
         try {
@@ -293,6 +325,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             createDb();
         } catch (IOException | RocksDBException e) {
             closeRocksResources();
+
             throw new MetaStorageException(STARTING_STORAGE_ERR, "Failed to 
start the storage", e);
         } finally {
             rwLock.writeLock().unlock();
@@ -346,6 +379,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         DBOptions options = new DBOptions()
                 .setAtomicFlush(true)
                 .setCreateMissingColumnFamilies(true)
+                .setListeners(List.of(flusher.listener()))
                 .setCreateIfMissing(true);
 
         rocksResources.add(options);
@@ -368,6 +402,17 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         var handles = new ArrayList<ColumnFamilyHandle>(descriptors.size());
 
+        flusher = new RocksDbFlusher(
+                "rocksdb metastorage kv storage",
+                busyLock,
+                scheduledExecutor,
+                executor,
+                () -> KV_STORAGE_FLUSH_DELAY,
+                // It is expected that the metastorage command raft log works 
with fsync=true.
+                new NoOpLogSyncer(),
+                () -> {}
+        );
+
         options = createDbOptions();
 
         db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), 
descriptors, handles);
@@ -386,9 +431,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         snapshotManager = new RocksSnapshotManager(db,
                 List.of(fullRange(data), fullRange(index), 
fullRange(tsToRevision), fullRange(revisionToTs), 
fullRange(revisionToChecksum)),
-                snapshotExecutor
+                executor
         );
 
+        flusher.init(db, handles);
+
         byte[] revision = data.get(REVISION_KEY);
 
         if (revision != null) {
@@ -438,11 +485,19 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
         stopCompaction();
 
+        busyLock.block();
+
         watchProcessor.close();
+        flusher.stop();
 
-        IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10, 
TimeUnit.SECONDS);
 
         rwLock.writeLock().lock();
         try {
@@ -463,7 +518,9 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         rwLock.writeLock().lock();
 
         try {
-            return snapshotManager.createSnapshot(snapshotPath);
+            return snapshotManager
+                    .createSnapshot(snapshotPath)
+                    .thenCompose(unused -> flush());
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -1601,4 +1658,9 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     private void addIndexAndTermToWriteBatch(WriteBatch batch, 
KeyValueUpdateContext context) throws RocksDBException {
         data.put(batch, INDEX_AND_TERM_KEY, longsToBytes(0, context.index, 
context.term));
     }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return inBusyLockAsync(busyLock, () -> flusher.awaitFlush(true));
+    }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index b40028ed13..0f0d7b5320 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static 
org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type.NOT_EXISTS;
 import static 
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -390,4 +391,18 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
         assertThat(checksumAndRevisions.minChecksummedRevision(), is(2L));
         assertThat(checksumAndRevisions.maxChecksummedRevision(), is(2L));
     }
+
+    @Test
+    void testFlush() throws Exception {
+        byte[] key = key(1);
+        byte[] value = keyValue(1, 1);
+
+        putToMs(key, value);
+
+        assertThat(storage.flush(), willCompleteSuccessfully());
+
+        restartStorage();
+
+        assertArrayEquals(value, storage.get(key).value());
+    }
 }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 78955c6d81..4d7e306981 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -916,4 +916,9 @@ public class SimpleInMemoryKeyValueStorage extends 
AbstractKeyValueStorage {
             }
         };
     }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return nullCompletedFuture();
+    }
 }
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 79e9048505..aedfe8b895 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -96,7 +96,6 @@ import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorag
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.configuration.ClusterConfiguration;
 import org.apache.ignite.internal.configuration.ComponentWorkingDir;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -1194,8 +1193,6 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
             Path storagePath = dir.resolve("storage");
 
-            LogSyncer logSyncer = partitionsLogStorageFactory;
-
             dataStorageMgr = new DataStorageManager(
                     dataStorageModules.createStorageEngines(
                             name,
@@ -1203,7 +1200,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                             dir.resolve("storage"),
                             null,
                             failureManager,
-                            logSyncer,
+                            partitionsLogStorageFactory,
                             hybridClock
                     ),
                     storageConfiguration
@@ -1331,7 +1328,7 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     lowWatermark,
                     transactionInflights,
                     indexMetaStorage,
-                    logSyncer,
+                    partitionsLogStorageFactory,
                     partitionReplicaLifecycleManager,
                     minTimeCollectorService
             ) {
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 40cf25ea72..e4a1b8c4d2 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.rocksdb.flush;
 
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.ArrayList;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.IntSupplier;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -279,6 +281,18 @@ public class RocksDbFlusher {
      * @return Future that completes when the {@code onFlushCompleted} 
callback finishes.
      */
     CompletableFuture<Void> onFlushCompleted() {
-        return CompletableFuture.runAsync(onFlushCompleted, threadPool);
+        return inBusyLockSafeAsync(() -> runAsync(onFlushCompleted, 
threadPool));
+    }
+
+    private CompletableFuture<Void> 
inBusyLockSafeAsync(Supplier<CompletableFuture<Void>> supplier) {
+        if (!busyLock.enterBusy()) {
+            return nullCompletedFuture();
+        }
+
+        try {
+            return supplier.get();
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 2da26a44a1..dd4d023b6b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -105,7 +105,6 @@ import 
org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateSto
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.configuration.ComponentWorkingDir;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationModules;
@@ -639,8 +638,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         Path storagePath = getPartitionsStorePath(dir);
 
-        LogSyncer logSyncer = partitionsLogStorageFactory;
-
         DataStorageManager dataStorageManager = new DataStorageManager(
                 dataStorageModules.createStorageEngines(
                         name,
@@ -648,7 +645,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                         storagePath,
                         null,
                         failureProcessor,
-                        logSyncer,
+                        partitionsLogStorageFactory,
                         hybridClock
                 ),
                 storageConfiguration
@@ -729,7 +726,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 lowWatermark,
                 transactionInflights,
                 indexMetaStorage,
-                logSyncer,
+                partitionsLogStorageFactory,
                 new PartitionReplicaLifecycleManager(
                         catalogManager,
                         replicaMgr,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4a9624d5f1..9c8564fd28 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -88,7 +88,6 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.compute.AntiHijackIgniteCompute;
 import org.apache.ignite.internal.compute.ComputeComponent;
@@ -584,13 +583,11 @@ public class IgniteImpl implements Ignite {
 
         ComponentWorkingDir partitionsWorkDir = 
partitionsPath(systemConfiguration, workDir);
 
-        boolean raftUseFsync = raftConfiguration.fsync().value();
-
         partitionsLogStorageFactory = SharedLogStorageFactoryUtils.create(
                 "table data log",
                 clusterSvc.nodeName(),
                 partitionsWorkDir.raftLogPath(),
-                raftUseFsync
+                raftConfiguration.fsync().value()
         );
 
         raftMgr = new Loza(
@@ -682,6 +679,7 @@ public class IgniteImpl implements Ignite {
                 "meta-storage log",
                 clusterSvc.nodeName(),
                 metastorageWorkDir.raftLogPath(),
+                // If it changes, then it will be necessary to set LogSyncer 
to RocksDbKeyValueStorage.
                 true
         );
 
@@ -832,19 +830,13 @@ public class IgniteImpl implements Ignite {
 
         GcConfiguration gcConfig = 
clusterConfigRegistry.getConfiguration(GcExtensionConfiguration.KEY).gc();
 
-        LogSyncer logSyncer = () -> {
-            partitionsLogStorageFactory.sync();
-            cmgLogStorageFactory.sync();
-            msLogStorageFactory.sync();
-        };
-
         Map<String, StorageEngine> storageEngines = 
dataStorageModules.createStorageEngines(
                 name,
                 nodeConfigRegistry,
                 storagePath,
                 longJvmPauseDetector,
                 failureManager,
-                logSyncer,
+                partitionsLogStorageFactory,
                 clock
         );
 
@@ -1012,7 +1004,7 @@ public class IgniteImpl implements Ignite {
                 lowWatermark,
                 transactionInflights,
                 indexMetaStorage,
-                logSyncer,
+                partitionsLogStorageFactory,
                 partitionReplicaLifecycleManager,
                 minTimeCollectorService
         );
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 73139386b1..3f6e67d09a 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -110,7 +110,6 @@ import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorag
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.configuration.ClusterConfiguration;
 import org.apache.ignite.internal.configuration.ComponentWorkingDir;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
@@ -1313,8 +1312,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
             Path storagePath = dir.resolve("storage");
 
-            LogSyncer logSyncer = logStorageFactory;
-
             dataStorageMgr = new DataStorageManager(
                     dataStorageModules.createStorageEngines(
                             name,
@@ -1322,7 +1319,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                             dir.resolve("storage"),
                             null,
                             failureManager,
-                            logSyncer,
+                            logStorageFactory,
                             hybridClock
                     ),
                     storageConfiguration
@@ -1437,7 +1434,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     lowWatermark,
                     transactionInflights,
                     indexMetaStorage,
-                    logSyncer,
+                    logStorageFactory,
                     new PartitionReplicaLifecycleManager(
                             catalogManager,
                             replicaManager,

Reply via email to