This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new d0da231 IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987. d0da231 is described below commit d0da231c7e8891d9fd45be3a245612e41b464e60 Author: zstan <stanilov...@gmail.com> AuthorDate: Thu Apr 29 09:56:07 2021 +0300 IGNITE-6324 Filtering uncommited tx WAL records to prevent restoring tx partially - Fixes #8987. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../internal/pagemem/wal/record/DataEntry.java | 1 + .../internal/pagemem/wal/record/WALRecord.java | 3 +- .../GridCacheDatabaseSharedManager.java | 33 +- .../persistence/checkpoint/CheckpointManager.java | 4 +- .../cache/persistence/checkpoint/Checkpointer.java | 13 +- .../cache/transactions/IgniteTxAdapter.java | 17 +- .../cache/transactions/IgniteTxLocalAdapter.java | 11 +- .../cache/transactions/IgniteTxManager.java | 66 +++- .../persistence/db/IgniteLogicalRecoveryTest.java | 2 +- .../db/IgniteLogicalRecoveryWithParamsTest.java | 368 +++++++++++++++++++++ .../IgnitePdsWithIndexingCoreTestSuite.java | 2 + 11 files changed, 487 insertions(+), 33 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java index dd05726..73de55b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java @@ -44,6 +44,7 @@ public class DataEntry { protected GridCacheOperation op; /** Near transaction version. */ + @GridToStringInclude protected GridCacheVersion nearXidVer; /** Write version. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index f07b71a..8f49e5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -342,7 +342,8 @@ public abstract class WALRecord { PHYSICAL, /** * Logical records are needed to replay logical updates since last checkpoint. - * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(CheckpointStatus, org.apache.ignite.lang.IgnitePredicate, org.apache.ignite.lang.IgniteBiPredicate, boolean)} + * {@link GridCacheDatabaseSharedManager#applyLogicalUpdates(CheckpointStatus, org.apache.ignite.lang.IgnitePredicate, + * org.apache.ignite.lang.IgniteBiPredicate, boolean)} */ LOGICAL, /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index e6ca08c..42ff08d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord; import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.ReencryptionStartRecord; import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord; +import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; @@ -137,6 +138,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; import org.apache.ignite.internal.processors.port.GridPortRecord; @@ -183,6 +185,7 @@ import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD_V2; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; @@ -839,7 +842,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan metaStorage = createMetastorage(true); - applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), false); + applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), true); fillWalDisabledGroups(); @@ -1945,9 +1948,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan status, groupsWithEnabledWal(), logicalRecords(), - true + false ); + cctx.tm().clearUncommitedStates(); + if (recoveryVerboseLogging && log.isInfoEnabled()) { log.info("Partition states information after LOGICAL RECOVERY phase:"); @@ -2598,6 +2603,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param status Last registered checkpoint status. + * @param restoreMeta Metastore restore phase if {@code true}. * @throws IgniteCheckedException If failed to apply updates. * @throws StorageException If IO exception occurred while reading write-ahead log. */ @@ -2605,13 +2611,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan CheckpointStatus status, IgnitePredicate<Integer> cacheGroupsPredicate, IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate, - boolean skipFieldLookup + boolean restoreMeta ) throws IgniteCheckedException { if (log.isInfoEnabled()) - log.info("Applying lost cache updates since last checkpoint record [lastMarked=" + log.info("Applying lost " + (restoreMeta ? "metastore" : "cache") + " updates since last checkpoint record [lastMarked=" + status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']'); - if (skipFieldLookup) + if (!restoreMeta) cctx.kernalContext().query().skipFieldLookup(true); long start = U.currentTimeMillis(); @@ -2633,6 +2639,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan RestoreLogicalState restoreLogicalState = new RestoreLogicalState(status, it, lastArchivedSegment, cacheGroupsPredicate, partitionRecoveryStates); + final IgniteTxManager txManager = cctx.tm(); + try { while (it.hasNextX()) { WALRecord rec = restoreLogicalState.next(); @@ -2641,6 +2649,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan break; switch (rec.type()) { + case TX_RECORD: + if (restoreMeta) { // Also restore tx states. + TxRecord txRec = (TxRecord)rec; + + txManager.collectTxStates(txRec); + } + + break; case CHECKPOINT_RECORD: // Calculate initial partition states CheckpointRecord cpRec = (CheckpointRecord)rec; @@ -2682,6 +2698,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan DataRecord dataRec = (DataRecord)rec; for (DataEntry dataEntry : dataRec.writeEntries()) { + if (!restoreMeta && txManager.uncommitedTx(dataEntry)) + continue; + int cacheId = dataEntry.cacheId(); DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheId); @@ -2777,7 +2796,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan finally { it.close(); - if (skipFieldLookup) + if (!restoreMeta) cctx.kernalContext().query().skipFieldLookup(false); } @@ -3465,7 +3484,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @return WAL records predicate that passes only Metastorage and encryption data records. */ private IgniteBiPredicate<WALRecord.RecordType, WALPointer> onlyMetastorageAndEncryptionRecords() { - return (type, ptr) -> type == METASTORE_DATA_RECORD || + return (type, ptr) -> type == METASTORE_DATA_RECORD || type == TX_RECORD || type == MASTER_KEY_CHANGE_RECORD || type == MASTER_KEY_CHANGE_RECORD_V2; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java index 73b05d6..3d8c287 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java @@ -327,9 +327,7 @@ public class CheckpointManager { checkpointMarkersStorage.cleanupCheckpointDirectory(); } - /** - * - */ + /** Current checkpointer implementation. */ public Checkpointer getCheckpointer() { return checkpointer; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java index 1424a44..a7ac809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java @@ -120,7 +120,7 @@ public class Checkpointer extends GridWorker { private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds. /** Avoid the start checkpoint if checkpointer was canceled. */ - private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); + private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false); /** Long JVM pause threshold. */ private final int longJvmPauseThreshold = @@ -244,7 +244,7 @@ public class Checkpointer extends GridWorker { checkpointWritePageThreads, checkpointWritePageThreads, 30_000, - new LinkedBlockingQueue<Runnable>() + new LinkedBlockingQueue<>() ); return null; @@ -1007,4 +1007,13 @@ public class Checkpointer extends GridWorker { private boolean isShutdownNow() { return shutdownNow; } + + /** + * Skip checkpoint on node stop. + * + * @param skip If {@code true} skips checkpoint on node stop. + */ + public void skipCheckpointOnNodeStop(boolean skip) { + skipCheckpointOnNodeStop = skip; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 9daddb5..04db6cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -63,7 +63,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; -import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry; @@ -1228,7 +1227,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement seal(); if (state == PREPARED || state == COMMITTED || state == ROLLED_BACK) { - cctx.tm().setMvccState(this, toMvccState(state)); + cctx.tm().setMvccState(this, state); ptr = cctx.tm().logTxRecord(this); } @@ -1260,20 +1259,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** */ - private byte toMvccState(TransactionState state) { - switch (state) { - case PREPARED: - return TxState.PREPARED; - case COMMITTED: - return TxState.COMMITTED; - case ROLLED_BACK: - return TxState.ABORTED; - default: - throw new IllegalStateException("Unexpected state: " + state); - } - } - - /** */ private void recordStateChangedEvent(TransactionState state) { if (!near() || !local()) // Covers only GridNearTxLocal's state changes. return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 7b6cd7f..50d07d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpda import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -937,7 +938,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) + ", err=" + ex + ']'); - return; + boolean persistenceEnabled = CU.isPersistenceEnabled(cctx.kernalContext().config()); + + if (persistenceEnabled) { + GridCacheDatabaseSharedManager dbManager = (GridCacheDatabaseSharedManager)cctx.database(); + + dbManager.getCheckpointer().skipCheckpointOnNodeStop(true); + } + + throw ex; } err = heuristicException(ex); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c3c07b5..c153caa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -34,6 +35,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; @@ -53,6 +55,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -88,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOpti import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; +import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -198,6 +202,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, DFLT_SLOW_TX_WARN_TIMEOUT); + /** Returns {@code true} if transaction has completed states. */ + public static final Predicate<TxRecord> COMPLETED_TX_STATES = new Predicate<TxRecord>() { + @Override public boolean test(TxRecord txRec) { + return txRec.state() == COMMITTED || txRec.state() == ROLLED_BACK; + } + }; + + /** Returns {@code true} if transaction has prepared states. */ + public static final Predicate<TxRecord> PREPARED_TX_STATES = new Predicate<TxRecord>() { + @Override public boolean test(TxRecord txRec) { + return txRec.state() == PREPARED || txRec.state() == PREPARING; + } + }; + + /** Uncommited tx states. */ + private Set<GridCacheVersion> uncommitedTx = new HashSet<>(); + /** One phase commit deferred ack request timeout. */ public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT = Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, @@ -2842,20 +2863,36 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param tx Transaction. * @param state New state. */ - public void setMvccState(IgniteInternalTx tx, byte state) { + public void setMvccState(IgniteInternalTx tx, TransactionState state) { if (cctx.kernalContext().clientNode() || tx.mvccSnapshot() == null || tx.near() && !tx.local()) return; + byte state0 = toMvccState(state); + cctx.database().checkpointReadLock(); try { - cctx.coordinators().updateState(tx.mvccSnapshot(), state, tx.local()); + cctx.coordinators().updateState(tx.mvccSnapshot(), state0, tx.local()); } finally { cctx.database().checkpointReadUnlock(); } } + /** */ + private byte toMvccState(TransactionState state) { + switch (state) { + case PREPARED: + return TxState.PREPARED; + case COMMITTED: + return TxState.COMMITTED; + case ROLLED_BACK: + return TxState.ABORTED; + default: + throw new IllegalStateException("Unexpected state: " + state); + } + } + /** * Finishes MVCC transaction. * @param tx Transaction. @@ -3764,4 +3801,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return 0; } } + + /** + * Collects tx states {@link TransactionState} for further correct restoring. + * + * @param txRec tx Record. + */ + public void collectTxStates(final TxRecord txRec) { + if (COMPLETED_TX_STATES.test(txRec)) + uncommitedTx.remove(txRec.nearXidVersion()); + else if (PREPARED_TX_STATES.test(txRec)) + uncommitedTx.add(txRec.nearXidVersion()); + } + + /** + * @param dataEntry Processing entry. + * @return {@code true} If entry contains not completed tx version. + */ + public boolean uncommitedTx(final DataEntry dataEntry) { + return uncommitedTx.contains(dataEntry.nearXidVersion()); + } + + /** Clears tx states collections. */ + public void clearUncommitedStates() { + uncommitedTx = Collections.emptySet(); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java index 8cc7c3f..3f8f853 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryTest.java @@ -480,7 +480,7 @@ public class IgniteLogicalRecoveryTest extends GridCommonAbstractTest { .collect(Collectors.toList()); Assert.assertTrue("There was unexpected rebalance for some groups" + - " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty()); + " [node=" + node.name() + ", groups=" + rebalancedGroups + ']', rebalancedGroups.isEmpty()); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java new file mode 100644 index 0000000..aac00dc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteLogicalRecoveryWithParamsTest.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.file.OpenOption; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP; +import static org.apache.ignite.testframework.GridTestUtils.DFLT_BUSYWAIT_SLEEP_INTERVAL; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** + * A set of tests that check correctness of logical recovery performed during node start. + */ +@RunWith(Parameterized.class) +public class IgniteLogicalRecoveryWithParamsTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setFailureHandler(new StopNodeFailureHandler()); + + DataStorageConfiguration dsCfg = new DataStorageConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointFrequency(1024 * 1024 * 1024) // Disable automatic checkpoints. + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setName("dflt") + .setInitialSize(256 * 1024 * 1024) + .setMaxSize(256 * 1024 * 1024) + .setPersistenceEnabled(true) + ); + + cfg.setDataStorageConfiguration(dsCfg); + + TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** Parametrized run param : server nodes. */ + @Parameterized.Parameter(0) + public Integer numSrvNodes; + + /** Parametrized run param : single node tx. */ + @Parameterized.Parameter(1) + public Boolean singleNodeTx; + + /** Parametrized run param : backups count. */ + @Parameterized.Parameter(2) + public Integer backups; + + /** Test run configurations: Cache mode, atomicity type, is near. */ + @Parameterized.Parameters(name = "nodesCnt={0}, singleNodeTx={1}, backups={2}") + public static Collection<Object[]> runConfig() { + return Arrays.asList(new Object[][] { + {1, true, 0}, + {1, true, 1}, + {1, false, 0}, + {1, false, 1}, + {2, true, 0}, + {2, true, 1}, + {2, false, 0}, + {2, false, 1}, + }); + } + + /**Tests partially commited transactions with further recovery. */ + @Test + @WithSystemProperty(key = IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, value = "true") + @WithSystemProperty(key = IGNITE_WAL_LOG_TX_RECORDS, value = "true") + public void testPartiallyCommitedTx_TwoNode_WithCpOnNodeStop_MultiNodeTx_OneBackup() throws Exception { + testPartiallyCommitedTx(); + } + + /**Tests partially commited transactions with further recovery. */ + @Test + @WithSystemProperty(key = IGNITE_WAL_LOG_TX_RECORDS, value = "true") + public void testPartiallyCommitedTx_TwoNode_WithoutCpOnNodeStop_SingleNodeTx() throws Exception { + testPartiallyCommitedTx(); + } + + /** + * Tests concurrent tx with node stop and further recovery. + * + */ + private void testPartiallyCommitedTx() throws Exception { + final String cacheName = "recovery"; + + int itmsCount = 30_000; + + AtomicBoolean failFileIO = new AtomicBoolean(); + + List<Integer> keys; + + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<Integer, Integer>(cacheName) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(backups) + .setAffinity(new RendezvousAffinityFunction(false, 32)); + + try { + final IgniteEx srv = (IgniteEx)startGridsMultiThreaded(numSrvNodes); + + G.allGrids().forEach(n -> setWalIOFactory(n, failFileIO)); + + IgniteEx clnt = startClientGrid("client"); + + TestRecordingCommunicationSpi nearComm = TestRecordingCommunicationSpi.spi(clnt); + + srv.cluster().state(ClusterState.ACTIVE); + + final IgniteCache cache = clnt.getOrCreateCache(cfg); + + final CountDownLatch commitStart = new CountDownLatch(1); + + forceCheckpoint(); + + nearComm.blockMessages((node, msg) -> msg instanceof GridNearTxPrepareRequest); + + if (singleNodeTx) + keys = primaryKeys(srv.cache(cacheName), itmsCount, 0); + else + keys = IntStream.range(0, itmsCount).boxed().collect(Collectors.toList()); + + Thread t = new Thread(() -> { + try (Transaction tx = clnt.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + keys.forEach(k -> cache.put(k, k)); + + commitStart.countDown(); + + tx.commit(); + } + }); + + t.start(); + + commitStart.await(); + + nearComm.waitForBlocked(); + + nearComm.stopBlock(); + + assertTrue(waitForWalUpdates(G.allGrids().stream().filter(g -> !g.configuration().isClientMode()) + .collect(Collectors.toList()))); + } + finally { + failFileIO.set(true); + + stopAllGrids(true); + + assertTrue(G.allGrids().isEmpty()); + } + + final IgniteEx srv = (IgniteEx)startGridsMultiThreaded(numSrvNodes); + + srv.cluster().state(ClusterState.ACTIVE); + + IgniteCache<Integer, Integer> cache = srv.cache(cacheName); + + int cSize = cache.size(); + + boolean pr = cache.get(keys.get(0)) == null; + + for (int i : keys) { + Object res = cache.get(i); + + if (pr != (res == null)) + assertEquals("ethalon=" + pr + ", current=" + res + ", key=" + i, pr, res == null); + } + + assert (cSize == itmsCount || cSize == 0) : "unexpected cache size: " + cSize; + } + + /** */ + private boolean waitForWalUpdates(Collection<Ignite> grids) throws IgniteInterruptedCheckedException { + long start = U.currentTimeMillis(); + + int[] offsets = new int[grids.size()]; + + int gCnt = 0; + + for (Ignite grid : grids) + offsets[gCnt++] = getWalPos(grid); + + while (true) { + gCnt = 0; + + for (Ignite grid : grids) { + if (getWalPos(grid) - offsets[gCnt++] > 100) + return true; + } + + U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL / 2); + + if (U.currentTimeMillis() - start > 20_000) + return false; + } + } + + /** + * Sets file IO factory. + * + * @param grid Ignite instance. + * @param canFail If {@code true} throws exception on write. + * + */ + private void setWalIOFactory(Ignite grid, AtomicBoolean canFail) { + IgniteEx grid0 = (IgniteEx)grid; + + FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)grid0.context().cache().context().wal(); + + walMgr.setFileIOFactory(new FailingFileIOFactory(canFail)); + } + + /** + * @param grid Ignite instance. + * @return Returns current wal position. + */ + private int getWalPos(Ignite grid) { + IgniteEx grid0 = (IgniteEx)grid; + + FileWriteAheadLogManager walMgr = (FileWriteAheadLogManager)grid0.context().cache().context().wal(); + + FileWriteHandle fhAfter = U.field(walMgr, "currHnd"); + + try { + fhAfter.fsync(null); + } + catch (IgniteCheckedException e) { + U.warn(log, e); + } + + return fhAfter.position().fileOffset(); + } + + /** + * Create File I/O which fails after flag is touched. + */ + private static class FailingFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private AtomicBoolean fail; + + /** */ + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + /** */ + FailingFileIOFactory(AtomicBoolean fail) { + this.fail = fail; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + final FileIO delegate = delegateFactory.create(file, modes); + + return new FileIODecorator(delegate) { + @Override public int write(ByteBuffer srcBuf) throws IOException { + if (fail != null && fail.get()) + throw new IOException("No space left on device"); + + return super.write(srcBuf); + } + + /** {@inheritDoc} */ + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + if (fail != null && fail.get()) + throw new IOException("No space left on device"); + + return delegate.write(srcBuf, position); + } + + /** {@inheritDoc} */ + @Override public int write(byte[] buf, int off, int len) throws IOException { + if (fail != null && fail.get()) + throw new IOException("No space left on device"); + + return delegate.write(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return delegate.map(sizeBytes); + } + }; + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java index 4f6b671..26e1df4 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentS import org.apache.ignite.internal.processors.cache.persistence.PersistenceDirectoryWarningLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteCacheGroupsWithRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteLogicalRecoveryTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgniteLogicalRecoveryWithParamsTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsMultiNodePutGetRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteSequentialNodeCrashRecoveryTest; @@ -93,6 +94,7 @@ import org.junit.runners.Suite; IgnitePdsCorruptedIndexTest.class, IgniteLogicalRecoveryTest.class, + IgniteLogicalRecoveryWithParamsTest.class, IgniteSequentialNodeCrashRecoveryTest.class,