This is an automated email from the ASF dual-hosted git repository. ascherbakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new f6ba904 IGNITE-13244 Throw CacheInvalidStateException if all owners for a partition have failed on commit. f6ba904 is described below commit f6ba904a3fcb6cbaacc19001eaa2a7b71836fbce Author: Alexey Scherbakov <alexey.scherbak...@gmail.com> AuthorDate: Mon Nov 9 20:10:24 2020 +0300 IGNITE-13244 Throw CacheInvalidStateException if all owners for a partition have failed on commit. Signed-off-by: Alexey Scherbakov <alexey.scherbak...@gmail.com> --- .../distributed/near/GridNearTxFinishFuture.java | 23 +++ .../processors/cache/IgniteCacheGroupsTest.java | 9 +- .../near/IgniteTxExceptionNodeFailTest.java | 197 +++++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite3.java | 2 + 4 files changed, 229 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index fc239da..8a25f86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -31,6 +31,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture; import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -58,11 +59,16 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionRollbackException; +import static java.util.Collections.emptySet; +import static java.util.stream.Stream.concat; +import static java.util.stream.Stream.of; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import static org.apache.ignite.internal.processors.tracing.MTC.support; import static org.apache.ignite.internal.processors.tracing.SpanType.TX_NEAR_FINISH; +import static org.apache.ignite.transactions.TransactionState.COMMITTED; +import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** @@ -73,6 +79,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit /** */ private static final long serialVersionUID = 0L; + /** All owners left grid message. */ + public static final String ALL_PARTITION_OWNERS_LEFT_GRID_MSG = + "Failed to commit a transaction (all partition owners have left the grid, partition data has been lost)"; + /** Tracing span. */ private Span span; @@ -979,6 +989,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit /** {@inheritDoc} */ @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) { + if (tx.state() == COMMITTING || tx.state() == COMMITTED) { + if (concat(of(m.primary().id()), tx.transactionNodes().getOrDefault(nodeId, emptySet()).stream()) + .noneMatch(uuid -> cctx.discovery().alive(uuid))) { + onDone(new CacheInvalidStateException(ALL_PARTITION_OWNERS_LEFT_GRID_MSG + + m.entries().stream().map(e -> " [cacheName=" + e.cached().context().name() + + ", partition=" + e.key().partition() + + (S.includeSensitive() ? ", key=" + e.key() : "") + + "]").findFirst().orElse(""))); + + return true; + } + } + if (nodeId.equals(m.primary().id())) { if (msgLog.isDebugEnabled()) { msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() + diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index cd28022..db64131 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -99,6 +99,7 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.GridTestUtils.SF; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; import org.jetbrains.annotations.Nullable; import org.junit.Ignore; import org.junit.Test; @@ -4106,10 +4107,14 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { cacheOperation(rnd, cache); } catch (Exception e) { - if (X.hasCause(e, CacheStoppedException.class)) { + if (X.hasCause(e, CacheStoppedException.class) || + (X.hasCause(e, CacheInvalidStateException.class) && + X.hasCause(e, TransactionRollbackException.class)) + ) { // Cache operation can be blocked on // awaiting new topology version and cancelled with CacheStoppedException cause. - + // Cache operation can failed + // if a node was stopped during transaction. continue; } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java new file mode 100644 index 0000000..f16c6ea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.ignite.ShutdownPolicy; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionHeuristicException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.springframework.util.Assert; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi; +import static org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.ALL_PARTITION_OWNERS_LEFT_GRID_MSG; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled; + +/** + * Tests check a result of commit when a node fail before + * send {@link GridNearTxFinishResponse} to transaction coodinator + */ +@RunWith(Parameterized.class) +public class IgniteTxExceptionNodeFailTest extends GridCommonAbstractTest { + /** Parameters. */ + @Parameterized.Parameters(name = "syncMode={0}") + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][] { + { PRIMARY_SYNC }, + { FULL_SYNC }, + }); + } + + /** syncMode */ + @Parameterized.Parameter() + public CacheWriteSynchronizationMode syncMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + DataStorageConfiguration dsConfig = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024) + .setPersistenceEnabled(true)); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + return cfg + .setDataStorageConfiguration(dsConfig) + .setCacheConfiguration(new CacheConfiguration("cache") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(syncMode).setBackups(0)); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * <ul> + * <li>Start 2 nodes with transactional cache, without backups, with {@link IgniteTxExceptionNodeFailTest#syncMode} + * <li>Start transaction: + * <ul> + * <li>put a key to a partition on transaction coordinator + * <li>put a key to a partition on other node + * <li>try to commit the transaction + * </ul> + * <li>Stop other node when it try to send GridNearTxFinishResponse + * <li>Check that {@link Transaction#commit()} throw {@link TransactionHeuristicException} + * </ul> + * + * @throws Exception If failed + */ + @Test + public void testNodeFailBeforeSendGridNearTxFinishResponse() throws Exception { + startGrids(2); + + grid(0).cluster().active(true); + + IgniteEx grid0 = grid(0); + IgniteEx grid1 = grid(1); + + int key0 = 0; + int key1 = 0; + + Affinity<Object> aff = grid1.affinity("cache"); + + for (int i = 1; i < 1000; i++) { + if (grid0.equals(grid(aff.mapKeyToNode(i)))) { + key0 = i; + + break; + } + } + + for (int i = key0; i < 1000; i++) { + if (grid1.equals(grid(aff.mapKeyToNode(i))) && !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(i))) { + key1 = i; + + break; + } + } + + assert !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(key1)); + + try (Transaction tx = grid1.transactions().txStart()) { + grid1.cache("cache").put(key0, 100); + grid1.cache("cache").put(key1, 200); + + spi(grid0).blockMessages((node, msg) -> { + if (msg instanceof GridNearTxFinishResponse) { + new Thread( + new Runnable() { + @Override public void run() { + log().info("Stopping node: [" + grid0.name() + "]"); + + IgnitionEx.stop(grid0.name(), true, ShutdownPolicy.IMMEDIATE, false); + } + }, + "node-stopper" + ).start(); + + return true; + } + + return false; + } + ); + + boolean passed = false; + + try { + tx.commit(); + } + catch (Throwable e) { + String msg = e.getMessage(); + + Assert.isTrue(e.getCause() instanceof CacheInvalidStateException); + + Assert.isTrue(msg.contains(ALL_PARTITION_OWNERS_LEFT_GRID_MSG)); + + if (!mvccEnabled(grid1.context())) { + Pattern msgPtrn = Pattern.compile(" \\[cacheName=cache, partition=\\d+, " + "key=KeyCacheObjectImpl \\[part=\\d+, val=" + key0 + + ", hasValBytes=true\\]\\]"); + + Matcher matcher = msgPtrn.matcher(msg); + + Assert.isTrue(matcher.find()); + } + + passed = true; + } + + Assert.isTrue(passed); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index ebe0b5a..1e4f59c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCa import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PDisabledByteArrayValuesSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxExceptionNodeFailTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest; @@ -152,6 +153,7 @@ public class IgniteCacheTestSuite3 { GridTestUtils.addTestIfNeeded(suite, IgniteTxReentryNearSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteTxReentryColocatedSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteTxExceptionNodeFailTest.class, ignoredTests); // Test for byte array value special case. GridTestUtils.addTestIfNeeded(suite, GridCacheLocalByteArrayValuesSelfTest.class, ignoredTests);