This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5180c53f7d IGNITE-21301 Sync raft log before flush in all storage
engines (#3535)
5180c53f7d is described below
commit 5180c53f7dad7923efa98604b5690dc7b3b5b3a5
Author: Phillippko <[email protected]>
AuthorDate: Thu Apr 4 18:03:58 2024 +0400
IGNITE-21301 Sync raft log before flush in all storage engines (#3535)
---
.../ignite/internal/components/LogSyncer.java} | 29 +++++-----------------
.../persistence/checkpoint/CheckpointManager.java | 5 +++-
.../persistence/checkpoint/Checkpointer.java | 16 +++++++++++-
.../PersistentPageMemoryNoLoadTest.java | 2 ++
.../checkpoint/CheckpointManagerTest.java | 3 +++
.../persistence/checkpoint/CheckpointerTest.java | 28 +++++++++++++++------
.../apache/ignite/internal/raft/RaftManager.java | 4 +++
.../raft/ItTruncateSuffixAndRestartTest.java | 4 +++
.../java/org/apache/ignite/internal/raft/Loza.java | 6 +++++
.../internal/raft/server/impl/JraftServerImpl.java | 6 +++++
.../internal/raft/storage/LogStorageFactory.java | 3 ++-
.../storage/impl/DefaultLogStorageFactory.java | 6 +++++
.../raft/storage/impl/LocalLogStorageFactory.java | 5 ++++
.../storage/impl/VolatileLogStorageFactory.java | 5 ++++
.../raft/storage/logit/LogitLogStorageFactory.java | 6 +++++
.../rocksdb/flush/RocksDbFlushListener.java | 19 +++++++++++++-
.../internal/rocksdb/flush/RocksDbFlusher.java | 9 ++++---
.../runner/app/ItIgniteNodeRestartTest.java | 3 ++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 6 ++++-
.../ignite/internal/storage/DataStorageModule.java | 5 +++-
.../internal/storage/DataStorageModules.java | 14 ++++++++---
.../internal/storage/DataStorageManagerTest.java | 9 +++++--
.../internal/storage/DataStorageModulesTest.java | 6 +++--
.../storage/engine/AbstractStorageEngineTest.java | 21 ++++++++++++++++
.../storage/impl/TestDataStorageModule.java | 4 ++-
.../PersistentPageMemoryDataStorageModule.java | 7 ++++--
.../PersistentPageMemoryStorageEngine.java | 9 ++++++-
.../VolatilePageMemoryDataStorageModule.java | 4 ++-
.../PersistentPageMemoryMvTableStorageTest.java | 11 +++++++-
.../PersistentPageMemoryStorageEngineTest.java | 3 ++-
.../PersistentPageMemoryHashIndexStorageTest.java | 11 +++++++-
...PersistentPageMemorySortedIndexStorageTest.java | 5 +++-
...ageMemoryMvPartitionStorageConcurrencyTest.java | 11 +++++++-
...rsistentPageMemoryMvPartitionStorageGcTest.java | 11 +++++++-
...PersistentPageMemoryMvPartitionStorageTest.java | 11 +++++++-
.../storage/rocksdb/RocksDbDataStorageModule.java | 6 +++--
.../storage/rocksdb/RocksDbStorageEngine.java | 11 +++++++-
.../instance/SharedRocksDbInstanceCreator.java | 1 +
.../RocksDbMvPartitionStorageConcurrencyTest.java | 3 ++-
.../rocksdb/RocksDbMvPartitionStorageGcTest.java | 3 ++-
.../rocksdb/RocksDbMvPartitionStorageTest.java | 3 ++-
.../storage/rocksdb/RocksDbMvTableStorageTest.java | 4 ++-
.../storage/rocksdb/RocksDbStorageEngineTest.java | 3 ++-
.../rocksdb/engine/RocksDbStorageEngineTest.java | 3 ++-
.../rocksdb/index/RocksDbHashIndexStorageTest.java | 3 ++-
.../index/RocksDbSortedIndexStorageTest.java | 3 ++-
.../instance/SharedRocksDbInstanceTest.java | 4 ++-
.../rebalance/ItRebalanceDistributedTest.java | 3 ++-
.../internal/table/distributed/TableManager.java | 1 +
.../distributed/TableManagerRecoveryTest.java | 22 +++++++++++++---
.../table/distributed/TableManagerTest.java | 10 +++++++-
.../PersistentPageMemoryGcUpdateHandlerTest.java | 4 ++-
.../distributed/gc/RocksDbGcUpdateHandlerTest.java | 3 ++-
.../state/rocksdb/TxStateRocksDbSharedStorage.java | 8 ++++++
.../state/rocksdb/RocksDbTxStateStorageTest.java | 4 ++-
.../storage/state/AbstractTxStateStorageTest.java | 3 ++-
56 files changed, 331 insertions(+), 81 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
similarity index 55%
copy from
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
index 75f1bcff02..612e6244b2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/components/LogSyncer.java
@@ -15,31 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.raft.storage;
-
-import org.apache.ignite.internal.close.ManuallyCloseable;
-import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.LogStorage;
-
-/** Log storage factory interface. */
-public interface LogStorageFactory extends ManuallyCloseable {
- /**
- * Starts the log storage factory.
- */
- void start();
+package org.apache.ignite.internal.components;
+/** Interface to synchronize write-ahead log. Operates only for persistent log
storages. */
+public interface LogSyncer {
/**
- * Creates a log storage.
+ * Synchronizes write-ahead log.
*
- * @param uri Log storage URI.
- * @param raftOptions Raft options.
- * @return Log storage.
- */
- LogStorage createLogStorage(String uri, RaftOptions raftOptions);
-
- /**
- * Closes the factory.
+ * @throws Exception if an error occurs whilst syncing.
*/
- @Override
- void close();
+ void sync() throws Exception;
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index d45439b88f..54d89ac5d3 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgenc
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -107,6 +108,7 @@ public class CheckpointManager {
PartitionMetaManager partitionMetaManager,
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
PageIoRegistry ioRegistry,
+ LogSyncer logSyncer,
// TODO: IGNITE-17017 Move to common config
int pageSize
) throws IgniteInternalCheckedException {
@@ -155,7 +157,8 @@ public class CheckpointManager {
checkpointPagesWriterFactory,
filePageStoreManager,
compactor,
- checkpointConfig
+ checkpointConfig,
+ logSyncer
);
checkpointTimeoutLock = new CheckpointTimeoutLock(
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 9df4c01675..bf896227ee 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -149,6 +150,8 @@ public class Checkpointer extends IgniteWorker {
/** Failure processor. */
private final FailureProcessor failureProcessor;
+ private final LogSyncer logSyncer;
+
/**
* Constructor.
*
@@ -161,6 +164,7 @@ public class Checkpointer extends IgniteWorker {
* @param filePageStoreManager File page store manager.
* @param compactor Delta file compactor.
* @param checkpointConfig Checkpoint configuration.
+ * @param logSyncer Write-ahead log synchronizer.
*/
Checkpointer(
String igniteInstanceName,
@@ -171,7 +175,8 @@ public class Checkpointer extends IgniteWorker {
CheckpointPagesWriterFactory factory,
FilePageStoreManager filePageStoreManager,
Compactor compactor,
- PageMemoryCheckpointConfiguration checkpointConfig
+ PageMemoryCheckpointConfiguration checkpointConfig,
+ LogSyncer logSyncer
) {
super(LOG, igniteInstanceName, "checkpoint-thread", workerListener);
@@ -182,6 +187,7 @@ public class Checkpointer extends IgniteWorker {
this.filePageStoreManager = filePageStoreManager;
this.compactor = compactor;
this.failureProcessor = failureProcessor;
+ this.logSyncer = logSyncer;
scheduledCheckpointProgress = new
CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
@@ -340,6 +346,14 @@ public class Checkpointer extends IgniteWorker {
}
}
+ try {
+ logSyncer.sync();
+ } catch (Exception e) {
+ log.error("Failed to sync write-ahead log during
checkpoint", e);
+
+ throw new IgniteInternalCheckedException(e);
+ }
+
if (!writePages(tracker, chp.dirtyPages, chp.progress, this,
this::isShutdownNow)) {
return;
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java
index 4f00b49f20..af3fbc0ec4 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemoryNoLoadTest.java
@@ -51,6 +51,7 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.LongStream;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -497,6 +498,7 @@ public class PersistentPageMemoryNoLoadTest extends
AbstractPageMemoryNoLoadSelf
partitionMetaManager,
dataRegions,
ioRegistry,
+ mock(LogSyncer.class),
PAGE_SIZE
);
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
index 74aac10747..c0559d2981 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManagerTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.IntStream;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -87,6 +88,7 @@ public class CheckpointManagerTest extends
BaseIgniteAbstractTest {
mock(PartitionMetaManager.class),
List.of(dataRegion),
mock(PageIoRegistry.class),
+ mock(LogSyncer.class),
1024
);
@@ -199,6 +201,7 @@ public class CheckpointManagerTest extends
BaseIgniteAbstractTest {
mock(PartitionMetaManager.class),
List.of(),
mock(PageIoRegistry.class),
+ mock(LogSyncer.class),
1024
));
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index f655f93700..f6b09cfed5 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -61,6 +61,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -118,7 +119,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
mock(FilePageStoreManager.class),
mock(Compactor.class),
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
);
assertNull(checkpointer.runner());
@@ -151,7 +153,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
mock(Compactor.class),
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
));
assertNull(checkpointer.lastCheckpointProgress());
@@ -255,7 +258,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
mock(Compactor.class),
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
);
CompletableFuture<?> waitCheckpointEventFuture =
runAsync(checkpointer::waitCheckpointEvent);
@@ -284,7 +288,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
mock(FilePageStoreManager.class),
mock(Compactor.class),
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
));
((CheckpointProgressImpl) checkpointer.scheduledProgress())
@@ -363,6 +368,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
Compactor compactor = mock(Compactor.class);
+ LogSyncer mockLogSyncer = mock(LogSyncer.class);
+
Checkpointer checkpointer = spy(new Checkpointer(
"test",
null,
@@ -372,7 +379,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
createCheckpointPagesWriterFactory(partitionMetaManager),
createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0),
filePageStore)),
compactor,
- checkpointConfig
+ checkpointConfig,
+ mockLogSyncer
));
assertDoesNotThrow(checkpointer::doCheckpoint);
@@ -380,6 +388,7 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
verify(dirtyPages, times(1)).toDirtyPageIdQueue();
verify(checkpointer, times(1)).startCheckpointProgress();
verify(compactor, times(1)).triggerCompaction();
+ verify(mockLogSyncer, times(1)).sync();
assertEquals(checkpointer.lastCheckpointProgress().currentCheckpointPagesCount(),
3);
@@ -401,7 +410,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
createCheckpointPagesWriterFactory(new
PartitionMetaManager(ioRegistry, PAGE_SIZE)),
createFilePageStoreManager(Map.of()),
compactor,
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
));
assertDoesNotThrow(checkpointer::doCheckpoint);
@@ -426,7 +436,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
mock(Compactor.class),
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
);
// Checks case 0 deviation.
@@ -467,7 +478,8 @@ public class CheckpointerTest extends
BaseIgniteAbstractTest {
mock(CheckpointPagesWriterFactory.class),
mock(FilePageStoreManager.class),
mock(Compactor.class),
- checkpointConfig
+ checkpointConfig,
+ mock(LogSyncer.class)
);
GroupPartitionId groupPartitionId = new GroupPartitionId(0, 0);
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index a9cfff5c35..a4d1ec336c 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.raft;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
@@ -172,4 +173,7 @@ public interface RaftManager extends IgniteComponent {
RaftServiceFactory<T> factory,
@Nullable Marshaller commandsMarshaller
) throws NodeStoppingException;
+
+ /** Returns write-ahead log syncer. */
+ LogSyncer getLogSyncer();
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index 3d29bdaeb3..c6fcd505ab 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -384,5 +384,9 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
@Override
public void close() {
}
+
+ @Override
+ public void sync(){
+ }
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 408e0dcd7a..78b88cb789 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
@@ -537,6 +538,11 @@ public class Loza implements RaftManager {
return raftServer;
}
+ @Override
+ public LogSyncer getLogSyncer() {
+ return raftServer.getLogSyncer();
+ }
+
/**
* Returns a cluster service.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index acee064370..d3a97a46fb 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.stream.IntStream;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
@@ -221,6 +222,11 @@ public class JraftServerImpl implements RaftServer {
return new StoreOptions();
}
+ /** Returns write-ahead log synchronizer. */
+ public LogSyncer getLogSyncer() {
+ return logStorageFactory;
+ }
+
/**
* Sets {@link AppendEntriesRequestInterceptor} to use. Should only be
called from the same thread that is used
* to {@link #start()} the component.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
index 75f1bcff02..c88fad5b9d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.raft.storage;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
/** Log storage factory interface. */
-public interface LogStorageFactory extends ManuallyCloseable {
+public interface LogStorageFactory extends ManuallyCloseable, LogSyncer {
/**
* Starts the log storage factory.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index 2346404d18..7b04ca1069 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -46,6 +46,7 @@ import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.Priority;
import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.util.SizeUnit;
@@ -159,6 +160,11 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle,
groupId, raftOptions, executorService);
}
+ @Override
+ public void sync() throws RocksDBException {
+ db.syncWal();
+ }
+
/**
* Returns a thread-local {@link WriteBatch} instance, attached to current
factory, append data from multiple storages at the same time.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
index 4680ff29f1..d5a6fddcc2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
@@ -40,4 +40,9 @@ public class LocalLogStorageFactory implements
LogStorageFactory {
public void close() {
// no-op
}
+
+ @Override
+ public void sync() {
+ // no-op
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
index 9ad607640f..e61a48b5ff 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
@@ -129,4 +129,9 @@ public class VolatileLogStorageFactory implements
LogStorageFactory {
public void close() {
// No-op.
}
+
+ @Override
+ public void sync() {
+ // No-op.
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
index 3537a55f2e..1220e2f98a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
@@ -92,6 +92,12 @@ public class LogitLogStorageFactory implements
LogStorageFactory {
ExecutorServiceHelper.shutdownAndAwaitTermination(checkpointExecutor);
}
+ @Override
+ public void sync() {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21955
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
private Path resolveLogStoragePath(String groupId) {
return baseLogStoragesPath.resolve(LOG_DIR_PREFIX + groupId);
}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
index ff38f3f1f9..c49c6f71c0 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlushListener.java
@@ -23,6 +23,9 @@ import static
org.rocksdb.AbstractEventListener.EnabledEventCallback.ON_FLUSH_CO
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.components.LogSyncer;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.FlushJobInfo;
import org.rocksdb.RocksDB;
@@ -31,6 +34,9 @@ import org.rocksdb.RocksDB;
* Represents a listener of RocksDB flush events.
*/
class RocksDbFlushListener extends AbstractEventListener {
+ /** Logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(RocksDbFlushListener.class);
+
/** Flusher instance. */
private final RocksDbFlusher flusher;
@@ -40,6 +46,9 @@ class RocksDbFlushListener extends AbstractEventListener {
*/
private final AtomicReference<EnabledEventCallback> lastEventType = new
AtomicReference<>(ON_FLUSH_COMPLETED);
+ /** Write-ahead log synchronizer. */
+ private final LogSyncer logSyncer;
+
/**
* Future that guarantees that last flush was fully processed and the new
flush can safely begin.
*/
@@ -49,16 +58,24 @@ class RocksDbFlushListener extends AbstractEventListener {
* Constructor.
*
* @param flusher Flusher instance to delegate events processing to.
+ * @param logSyncer Write-ahead log synchronizer.
*/
- RocksDbFlushListener(RocksDbFlusher flusher) {
+ RocksDbFlushListener(RocksDbFlusher flusher, LogSyncer logSyncer) {
super(ON_FLUSH_BEGIN, ON_FLUSH_COMPLETED);
this.flusher = flusher;
+ this.logSyncer = logSyncer;
}
/** {@inheritDoc} */
@Override
public void onFlushBegin(RocksDB db, FlushJobInfo flushJobInfo) {
+ try {
+ logSyncer.sync();
+ } catch (Exception e) {
+ LOG.error("Couldn't sync RocksDB WAL on flush begin", e);
+ }
+
if (lastEventType.compareAndSet(ON_FLUSH_COMPLETED, ON_FLUSH_BEGIN)) {
lastFlushProcessed.join();
}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index 70ebe8fdc4..216abb1ee2 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -86,7 +87,7 @@ public class RocksDbFlusher {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock;
- private final RocksDbFlushListener flushListener = new
RocksDbFlushListener(this);
+ private final RocksDbFlushListener flushListener;
/**
* Instance of the latest scheduled flush closure.
@@ -115,6 +116,7 @@ public class RocksDbFlusher {
ScheduledExecutorService scheduledPool,
ExecutorService threadPool,
IntSupplier delaySupplier,
+ LogSyncer logSyncer,
Runnable onFlushCompleted
) {
this.busyLock = busyLock;
@@ -122,6 +124,7 @@ public class RocksDbFlusher {
this.threadPool = threadPool;
this.delaySupplier = delaySupplier;
this.onFlushCompleted = onFlushCompleted;
+ this.flushListener = new RocksDbFlushListener(this, logSyncer);
}
/**
@@ -177,8 +180,8 @@ public class RocksDbFlusher {
* enabled.
*
* @param schedule {@code true} if {@link RocksDB#flush(FlushOptions)}
should be explicitly triggerred in the near future. Please refer
- * to {@link RocksDbFlusher#RocksDbFlusher(IgniteSpinBusyLock,
ScheduledExecutorService, ExecutorService, IntSupplier, Runnable)}
- * parameters description to see what's really happening in this case.
+ * to {@link RocksDbFlusher#RocksDbFlusher(IgniteSpinBusyLock,
ScheduledExecutorService, ExecutorService, IntSupplier, LogSyncer,
+ * Runnable)} parameters description to see what's really happening
in this case.
*
* @see #scheduleFlush()
*/
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index b7d893de6a..1132f1d6e2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -519,7 +519,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
nodeCfgMgr.configurationRegistry(),
storagePath,
null,
- failureProcessor
+ failureProcessor,
+ raftMgr.getLogSyncer()
)
);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index cbb8a66b03..ffea0ba99c 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -80,6 +80,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.component.RestAddressReporter;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.compute.AntiHijackIgniteCompute;
import org.apache.ignite.internal.compute.ComputeComponent;
@@ -631,12 +632,15 @@ public class IgniteImpl implements Ignite {
GcConfiguration gcConfig =
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
+ LogSyncer logSyncer = raftMgr.getLogSyncer();
+
Map<String, StorageEngine> storageEngines =
dataStorageModules.createStorageEngines(
name,
nodeConfigRegistry,
storagePath,
longJvmPauseDetector,
- failureProcessor
+ failureProcessor,
+ logSyncer
);
dataStorageMgr = new
DataStorageManager(applyThreadAssertionsIfNeeded(storageEngines));
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModule.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModule.java
index 302235e345..8b0e95f11f 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModule.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModule.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -44,6 +45,7 @@ public interface DataStorageModule {
* @param storagePath Storage path.
* @param longJvmPauseDetector Long JVM pause detector.
* @param failureProcessor Failure processor that is used to handle
critical errors.
+ * @param logSyncer Write-ahead log synchronizer.
* @throws StorageException If there is an error when creating the storage
engine.
*/
StorageEngine createEngine(
@@ -51,6 +53,7 @@ public interface DataStorageModule {
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) throws StorageException;
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModules.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModules.java
index 57fe00b6d9..d76f65501c 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModules.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataStorageModules.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import java.util.stream.Stream;
import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -90,12 +91,19 @@ public class DataStorageModules {
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
-
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) {
return modules.entrySet().stream().collect(toUnmodifiableMap(
Entry::getKey,
- e -> e.getValue().createEngine(igniteInstanceName,
configRegistry, storagePath, longJvmPauseDetector, failureProcessor)
+ e -> e.getValue().createEngine(
+ igniteInstanceName,
+ configRegistry,
+ storagePath,
+ longJvmPauseDetector,
+ failureProcessor,
+ logSyncer
+ )
));
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageManagerTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageManagerTest.java
index dc9e302b8e..2f99df82bf 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageManagerTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageManagerTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.configuration.ConfigurationWrongPolymorphicTypeIdException;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -78,7 +79,9 @@ public class DataStorageManagerTest extends
BaseIgniteAbstractTest {
mock(ConfigurationRegistry.class),
workDir,
null,
- mock(FailureProcessor.class))
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ )
);
// Check random polymorphicTypeId.
@@ -125,7 +128,9 @@ public class DataStorageManagerTest extends
BaseIgniteAbstractTest {
mock(ConfigurationRegistry.class),
workDir,
null,
- mock(FailureProcessor.class))
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ )
);
DataStorageView dataStorageView = dataStorageConfig.value();
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageModulesTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageModulesTest.java
index 7619829a23..d6a68eaa85 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageModulesTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/DataStorageModulesTest.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfigurationSchema;
@@ -108,7 +109,8 @@ public class DataStorageModulesTest extends
BaseIgniteAbstractTest {
mock(ConfigurationRegistry.class),
workDir,
null,
- mock(FailureProcessor.class)
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
);
assertThat(engines, aMapWithSize(2));
@@ -190,7 +192,7 @@ public class DataStorageModulesTest extends
BaseIgniteAbstractTest {
when(mock.name()).thenReturn(name);
- when(mock.createEngine(any(), any(), any(), any(),
any())).thenReturn(mock(StorageEngine.class));
+ when(mock.createEngine(any(), any(), any(), any(), any(),
any())).thenReturn(mock(StorageEngine.class));
return mock;
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
index f8eab538bb..b7b8798b0b 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
@@ -28,7 +28,9 @@ import static
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
@@ -37,6 +39,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
@@ -71,6 +74,8 @@ public abstract class AbstractStorageEngineTest extends
BaseMvStoragesTest {
/** Engine instance. */
private StorageEngine storageEngine;
+ protected LogSyncer logSyncer = mock(LogSyncer.class);
+
@BeforeEach
void createEngineBeforeTest() {
storageEngine = createEngine();
@@ -111,6 +116,22 @@ public abstract class AbstractStorageEngineTest extends
BaseMvStoragesTest {
checkMvTableStorageWithPartitionAfterRestart(tableId,
lastAppliedIndex, lastAppliedTerm);
}
+ /**
+ * Tests that write-ahead log is synced before flush.
+ */
+ @Test
+ void testSyncWalBeforeFlush() throws Exception {
+ assumeFalse(storageEngine.isVolatile());
+
+ int tableId = 1;
+ int lastAppliedIndex = 10;
+ int lastAppliedTerm = 20;
+
+ createMvTableWithPartitionAndFill(tableId, lastAppliedIndex,
lastAppliedTerm);
+
+ verify(logSyncer, atLeastOnce()).sync();
+ }
+
@Test
void testDropMvTableOnRecovery() throws Exception {
assumeFalse(storageEngine.isVolatile());
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestDataStorageModule.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestDataStorageModule.java
index ffd926f9b0..540deeeeae 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestDataStorageModule.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestDataStorageModule.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.storage.impl.TestStorageEngine.ENGINE_N
import com.google.auto.service.AutoService;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -47,7 +48,8 @@ public class TestDataStorageModule implements
DataStorageModule {
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) throws StorageException {
return new TestStorageEngine();
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataStorageModule.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataStorageModule.java
index c7fb53bf4a..3d3755879f 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataStorageModule.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryDataStorageModule.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.storage.pagememory.PersistentPageMemory
import com.google.auto.service.AutoService;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -49,7 +50,8 @@ public class PersistentPageMemoryDataStorageModule implements
DataStorageModule
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) throws StorageException {
PersistentPageMemoryStorageEngineConfiguration engineConfig =
configRegistry.getConfiguration(
PersistentPageMemoryStorageEngineConfiguration.KEY
@@ -67,6 +69,7 @@ public class PersistentPageMemoryDataStorageModule implements
DataStorageModule
ioRegistry,
storagePath,
longJvmPauseDetector,
- failureProcessor);
+ failureProcessor,
+ logSyncer);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 6a494cca73..6d8b3b6d9b 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.fileio.AsyncFileIoFactory;
@@ -100,6 +101,8 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
private final FailureProcessor failureProcessor;
+ private final LogSyncer logSyncer;
+
/**
* Constructor.
*
@@ -109,6 +112,7 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
* @param storagePath Storage path.
* @param failureProcessor Failure processor that is used to handle
critical errors.
* @param longJvmPauseDetector Long JVM pause detector.
+ * @param logSyncer Write-ahead log synchronizer.
*/
public PersistentPageMemoryStorageEngine(
String igniteInstanceName,
@@ -116,7 +120,8 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
PageIoRegistry ioRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) {
this.igniteInstanceName = igniteInstanceName;
this.engineConfig = engineConfig;
@@ -124,6 +129,7 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
this.storagePath = storagePath;
this.longJvmPauseDetector = longJvmPauseDetector;
this.failureProcessor = failureProcessor;
+ this.logSyncer = logSyncer;
}
/**
@@ -173,6 +179,7 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
partitionMetaManager,
regions.values(),
ioRegistry,
+ logSyncer,
pageSize
);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
index d6d63f456d..553f8a7b23 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataStorageModule.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.storage.pagememory.VolatilePageMemorySt
import com.google.auto.service.AutoService;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -50,7 +51,8 @@ public class VolatilePageMemoryDataStorageModule implements
DataStorageModule {
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) throws StorageException {
VolatilePageMemoryStorageEngineConfiguration engineConfig =
configRegistry.getConfiguration(
VolatilePageMemoryStorageEngineConfiguration.KEY
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
index 183fac8c47..eadcadff7f 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryMvTableStorageTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -62,7 +63,15 @@ public class PersistentPageMemoryMvTableStorageTest extends
AbstractMvTableStora
ioRegistry.loadFromServiceLoader();
- engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null, mock(FailureProcessor.class));
+ engine = new PersistentPageMemoryStorageEngine(
+ "test",
+ engineConfig,
+ ioRegistry,
+ workDir,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ );
engine.start();
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
index 0db0173d66..bf6bb9131e 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
@@ -54,7 +54,8 @@ public class PersistentPageMemoryStorageEngineTest extends
AbstractStorageEngine
ioRegistry,
workDir,
null,
- mock(FailureProcessor.class)
+ mock(FailureProcessor.class),
+ logSyncer
);
}
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
index 6a0c0cd025..799c62c7c2 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemoryHashIndexStorageTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.pagememory.configuration.schema
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -55,7 +56,15 @@ class PersistentPageMemoryHashIndexStorageTest extends
AbstractPageMemoryHashInd
ioRegistry.loadFromServiceLoader();
- engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null, mock(FailureProcessor.class));
+ engine = new PersistentPageMemoryStorageEngine(
+ "test",
+ engineConfig,
+ ioRegistry,
+ workDir,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ );
engine.start();
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
index 0dadd5bdbf..6ef756600a 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/index/PersistentPageMemorySortedIndexStorageTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.pagememory.configuration.schema
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -60,7 +61,9 @@ class PersistentPageMemorySortedIndexStorageTest extends
AbstractPageMemorySorte
ioRegistry,
workDir,
null,
- mock(FailureProcessor.class));
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ );
engine.start();
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
index 75b96cc3cc..b421e1db8e 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.pagememory.configuration.schema
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -54,7 +55,15 @@ class PersistentPageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPa
ioRegistry.loadFromServiceLoader();
- engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null, mock(FailureProcessor.class));
+ engine = new PersistentPageMemoryStorageEngine(
+ "test",
+ engineConfig,
+ ioRegistry,
+ workDir,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ );
engine.start();
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
index 074343fc63..f174270ef8 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.pagememory.configuration.schema
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -54,7 +55,15 @@ class PersistentPageMemoryMvPartitionStorageGcTest extends
AbstractMvPartitionSt
ioRegistry.loadFromServiceLoader();
- engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null, mock(FailureProcessor.class));
+ engine = new PersistentPageMemoryStorageEngine(
+ "test",
+ engineConfig,
+ ioRegistry,
+ workDir,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ );
engine.start();
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 0dd66ecb77..d671b6e402 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -65,7 +66,15 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
ioRegistry.loadFromServiceLoader();
- engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null, mock(FailureProcessor.class));
+ engine = new PersistentPageMemoryStorageEngine(
+ "test",
+ engineConfig,
+ ioRegistry,
+ workDir,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ );
engine.start();
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataStorageModule.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataStorageModule.java
index 2d6252be9d..3868504104 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataStorageModule.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataStorageModule.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine.EN
import com.google.auto.service.AutoService;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -46,12 +47,13 @@ public class RocksDbDataStorageModule implements
DataStorageModule {
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) throws StorageException {
RocksDbStorageEngineConfiguration engineConfig =
configRegistry.getConfiguration(RocksDbStorageEngineConfiguration.KEY);
assert engineConfig != null;
- return new RocksDbStorageEngine(igniteInstanceName, engineConfig,
storagePath);
+ return new RocksDbStorageEngine(igniteInstanceName, engineConfig,
storagePath, logSyncer);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 134262982b..e8929e21a0 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.StorageException;
@@ -88,16 +89,20 @@ public class RocksDbStorageEngine implements StorageEngine {
// TODO IGNITE-19762 Think of proper way to use regions and storages.
private final Map<String, RocksDbStorage> storageByRegionName = new
ConcurrentHashMap<>();
+ private final LogSyncer logSyncer;
+
/**
* Constructor.
*
* @param nodeName Node name.
* @param engineConfig RocksDB storage engine configuration.
* @param storagePath Storage path.
+ * @param logSyncer Write-ahead log synchronizer.
*/
- public RocksDbStorageEngine(String nodeName,
RocksDbStorageEngineConfiguration engineConfig, Path storagePath) {
+ public RocksDbStorageEngine(String nodeName,
RocksDbStorageEngineConfiguration engineConfig, Path storagePath, LogSyncer
logSyncer) {
this.engineConfig = engineConfig;
this.storagePath = storagePath;
+ this.logSyncer = logSyncer;
threadPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
@@ -130,6 +135,10 @@ public class RocksDbStorageEngine implements StorageEngine
{
return scheduledPool;
}
+ public LogSyncer logSyncer() {
+ return logSyncer;
+ }
+
@Override
public String name() {
return ENGINE_NAME;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
index fead078d23..69b6c9c6af 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceCreator.java
@@ -76,6 +76,7 @@ public class SharedRocksDbInstanceCreator {
engine.scheduledPool(),
engine.threadPool(),
engine.configuration().flushDelayMillis()::value,
+ engine.logSyncer(),
() -> {} // No-op.
);
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
index 857afd7b93..d4f7c64927 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -49,7 +50,7 @@ public class RocksDbMvPartitionStorageConcurrencyTest extends
AbstractMvPartitio
@InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16777216, writeBufferSize = 16777216}}")
RocksDbStorageEngineConfiguration engineConfig
) {
- engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+ engine = new RocksDbStorageEngine("test", engineConfig, workDir,
mock(LogSyncer.class));
engine.start();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
index aa26cc7381..e80384c64d 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageGcTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -49,7 +50,7 @@ public class RocksDbMvPartitionStorageGcTest extends
AbstractMvPartitionStorageG
@InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16777216, writeBufferSize = 16777216}}")
RocksDbStorageEngineConfiguration engineConfig
) {
- engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+ engine = new RocksDbStorageEngine("test", engineConfig, workDir,
mock(LogSyncer.class));
engine.start();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 0cd1b61c79..77ed5b4d1f 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -49,7 +50,7 @@ public class RocksDbMvPartitionStorageTest extends
AbstractMvPartitionStorageTes
@InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16777216, writeBufferSize = 16777216}}")
RocksDbStorageEngineConfiguration engineConfig
) {
- engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+ engine = new RocksDbStorageEngine("test", engineConfig, workDir,
mock(LogSyncer.class));
engine.start();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
index 994cba7fe0..7c3178cf89 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvTableStorageTest.java
@@ -24,10 +24,12 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
@@ -57,7 +59,7 @@ public class RocksDbMvTableStorageTest extends
AbstractMvTableStorageTest {
@InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16536, writeBufferSize = 16536}}")
RocksDbStorageEngineConfiguration rocksDbEngineConfig
) {
- engine = new RocksDbStorageEngine("test", rocksDbEngineConfig,
workDir);
+ engine = new RocksDbStorageEngine("test", rocksDbEngineConfig,
workDir, mock(LogSyncer.class));
engine.start();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
index 88680beb1b..671b3aebc1 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngineTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -55,7 +56,7 @@ public class RocksDbStorageEngineTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp(@WorkDirectory Path workDir) {
- engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+ engine = new RocksDbStorageEngine("test", engineConfig, workDir,
mock(LogSyncer.class));
engine.start();
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
index f908c5d121..d0e874edbe 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
@@ -43,7 +43,8 @@ public class RocksDbStorageEngineTest extends
AbstractStorageEngineTest {
return new RocksDbStorageEngine(
"test",
engineConfiguration,
- workDir
+ workDir,
+ logSyncer
);
}
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
index 40dd9027b0..bfb816ac1c 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorageTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -50,7 +51,7 @@ public class RocksDbHashIndexStorageTest extends
AbstractHashIndexStorageTest {
@InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16536, writeBufferSize = 16536}}")
RocksDbStorageEngineConfiguration rocksDbEngineConfig
) {
- engine = new RocksDbStorageEngine("test", rocksDbEngineConfig,
workDir);
+ engine = new RocksDbStorageEngine("test", rocksDbEngineConfig,
workDir, mock(LogSyncer.class));
engine.start();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 270bc8e33b..8e48005c47 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
@@ -50,7 +51,7 @@ public class RocksDbSortedIndexStorageTest extends
AbstractSortedIndexStorageTes
@InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16536, writeBufferSize = 16536}}")
RocksDbStorageEngineConfiguration rocksDbEngineConfig
) {
- engine = new RocksDbStorageEngine("test", rocksDbEngineConfig,
workDir);
+ engine = new RocksDbStorageEngine("test", rocksDbEngineConfig,
workDir, mock(LogSyncer.class));
engine.start();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
index a3ee3ba3de..e0d90321ce 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -33,6 +33,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.List;
@@ -42,6 +43,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -72,7 +74,7 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest {
@BeforeEach
void setUp(@InjectConfiguration RocksDbStorageEngineConfiguration
engineConfig) throws Exception {
- engine = new RocksDbStorageEngine("test", engineConfig, workDir);
+ engine = new RocksDbStorageEngine("test", engineConfig, workDir,
mock(LogSyncer.class));
engine.start();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 2acf534d2b..ab9c9a294f 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1123,7 +1123,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
nodeCfgMgr.configurationRegistry(),
dir.resolve("storage"),
null,
- failureProcessor
+ failureProcessor,
+ raftManager.getLogSyncer()
)
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index de51ec8fcd..e2930aedd8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -570,6 +570,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
storagePath.resolve(TX_STATE_DIR),
txStateStorageScheduledPool,
txStateStoragePool,
+ raftMgr.getLogSyncer(),
TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index a583930e6a..b4e11700f8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -266,6 +267,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(raftGrpSrvcMock.leader()).thenReturn(new Peer("node0"));
when(rm.startRaftGroupService(any(), any(), any(),
any())).thenAnswer(mock -> completedFuture(raftGrpSrvcMock));
+ when(rm.getLogSyncer()).thenReturn(mock(LogSyncer.class));
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class));
when(clusterService.topologyService()).thenReturn(topologyService);
when(topologyService.localMember()).thenReturn(node);
@@ -389,7 +391,14 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
DataStorageModules dataStorageModules = new
DataStorageModules(List.of(dataStorageModule));
DataStorageManager manager = new DataStorageManager(
- dataStorageModules.createStorageEngines(NODE_NAME,
mockedRegistry, storagePath, null, mock(FailureProcessor.class))
+ dataStorageModules.createStorageEngines(
+ NODE_NAME,
+ mockedRegistry,
+ storagePath,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ )
);
assertThat(manager.start(), willCompleteSuccessfully());
@@ -435,9 +444,16 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
ConfigurationRegistry configRegistry,
Path storagePath,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ LogSyncer logSyncer
) throws StorageException {
- return spy(super.createEngine(igniteInstanceName,
configRegistry, storagePath, longJvmPauseDetector, failureProcessor));
+ return spy(super.createEngine(igniteInstanceName,
+ configRegistry,
+ storagePath,
+ longJvmPauseDetector,
+ failureProcessor,
+ logSyncer
+ ));
}
};
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 2aa9cf921d..b857b3067f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
@@ -829,7 +830,14 @@ public class TableManagerTest extends IgniteAbstractTest {
DataStorageModules dataStorageModules = new
DataStorageModules(List.of(new PersistentPageMemoryDataStorageModule()));
DataStorageManager manager = new DataStorageManager(
- dataStorageModules.createStorageEngines(NODE_NAME,
mockedRegistry, storagePath, null, mock(FailureProcessor.class))
+ dataStorageModules.createStorageEngines(
+ NODE_NAME,
+ mockedRegistry,
+ storagePath,
+ null,
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
+ )
);
assertThat(manager.start(), willCompleteSuccessfully());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
index ec6034ddf0..fa924ff36d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/PersistentPageMemoryGcUpdateHandlerTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -66,7 +67,8 @@ class PersistentPageMemoryGcUpdateHandlerTest extends
AbstractGcUpdateHandlerTes
ioRegistry,
workDir,
new LongJvmPauseDetector(nodeName),
- mock(FailureProcessor.class)
+ mock(FailureProcessor.class),
+ mock(LogSyncer.class)
);
engine.start();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
index 16b820dda5..6995934e1b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/RocksDbGcUpdateHandlerTest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
+import org.apache.ignite.internal.components.LogSyncer;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
@@ -51,7 +52,7 @@ class RocksDbGcUpdateHandlerTest extends
AbstractGcUpdateHandlerTest {
TestInfo testInfo,
@InjectConfiguration RocksDbStorageEngineConfiguration engineConfig
) {
- engine = new RocksDbStorageEngine(testNodeName(testInfo, 0),
engineConfig, workDir);
+ engine = new RocksDbStorageEngine(testNodeName(testInfo, 0),
engineConfig, workDir, mock(LogSyncer.class));
engine.start();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
index dbbb9cd5ef..5d3a4a1719 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -89,12 +90,16 @@ public class TxStateRocksDbSharedStorage implements
ManuallyCloseable {
/** Supplier for the value of delay for scheduled database flush. */
private final IntSupplier flushDelaySupplier;
+ /** Write-ahead log synchronizer. */
+ private final LogSyncer logSyncer;
+
/**
* Constructor.
*
* @param dbPath Database path.
* @param scheduledExecutor Scheduled executor for delayed flushes.
* @param threadPool Thread pool for internal operations.
+ * @param logSyncer Write-ahead log synchronizer.
* @param flushDelaySupplier Flush delay supplier.
*
* @see RocksDbFlusher
@@ -103,12 +108,14 @@ public class TxStateRocksDbSharedStorage implements
ManuallyCloseable {
Path dbPath,
ScheduledExecutorService scheduledExecutor,
ExecutorService threadPool,
+ LogSyncer logSyncer,
IntSupplier flushDelaySupplier
) {
this.dbPath = dbPath;
this.scheduledExecutor = scheduledExecutor;
this.threadPool = threadPool;
this.flushDelaySupplier = flushDelaySupplier;
+ this.logSyncer = logSyncer;
}
/**
@@ -139,6 +146,7 @@ public class TxStateRocksDbSharedStorage implements
ManuallyCloseable {
scheduledExecutor,
threadPool,
flushDelaySupplier,
+ logSyncer,
() -> {} // No-op.
);
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
index 423a646adc..5bd183ddb1 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
import java.nio.file.Path;
import java.util.List;
@@ -28,6 +29,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -66,7 +68,7 @@ public class RocksDbTxStateStorageTest extends
AbstractTxStateStorageTest {
@Override
@BeforeEach
protected void beforeTest() {
- sharedStorage = new TxStateRocksDbSharedStorage(workDir,
scheduledExecutor, executor, () -> 0);
+ sharedStorage = new TxStateRocksDbSharedStorage(workDir,
scheduledExecutor, executor, mock(LogSyncer.class), () -> 0);
sharedStorage.start();
super.beforeTest();
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index e694c72e3d..64dc1eccaf 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.Cursor;
@@ -58,7 +59,7 @@ import org.junit.jupiter.api.function.Executable;
/**
* Abstract tx storage test.
*/
-public abstract class AbstractTxStateStorageTest {
+public abstract class AbstractTxStateStorageTest extends
BaseIgniteAbstractTest {
protected static final int TABLE_ID = 1;
protected TxStateTableStorage tableStorage;