This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f6ba904  IGNITE-13244 Throw CacheInvalidStateException if all owners 
for a partition have failed on commit.
f6ba904 is described below

commit f6ba904a3fcb6cbaacc19001eaa2a7b71836fbce
Author: Alexey Scherbakov <alexey.scherbak...@gmail.com>
AuthorDate: Mon Nov 9 20:10:24 2020 +0300

    IGNITE-13244 Throw CacheInvalidStateException if all owners for a partition 
have failed on commit.
    
    Signed-off-by: Alexey Scherbakov <alexey.scherbak...@gmail.com>
---
 .../distributed/near/GridNearTxFinishFuture.java   |  23 +++
 .../processors/cache/IgniteCacheGroupsTest.java    |   9 +-
 .../near/IgniteTxExceptionNodeFailTest.java        | 197 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite3.java   |   2 +
 4 files changed, 229 insertions(+), 2 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index fc239da..8a25f86 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import 
org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -58,11 +59,16 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionRollbackException;
 
+import static java.util.Collections.emptySet;
+import static java.util.stream.Stream.concat;
+import static java.util.stream.Stream.of;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
 import static org.apache.ignite.internal.processors.tracing.MTC.support;
 import static 
org.apache.ignite.internal.processors.tracing.SpanType.TX_NEAR_FINISH;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 
 /**
@@ -73,6 +79,10 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** All owners left grid message. */
+    public static final String ALL_PARTITION_OWNERS_LEFT_GRID_MSG =
+        "Failed to commit a transaction (all partition owners have left the 
grid, partition data has been lost)";
+
     /** Tracing span. */
     private Span span;
 
@@ -979,6 +989,19 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCacheCompoundIdentit
 
         /** {@inheritDoc} */
         @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+            if (tx.state() == COMMITTING || tx.state() == COMMITTED) {
+                if (concat(of(m.primary().id()), 
tx.transactionNodes().getOrDefault(nodeId, emptySet()).stream())
+                    .noneMatch(uuid -> cctx.discovery().alive(uuid))) {
+                    onDone(new 
CacheInvalidStateException(ALL_PARTITION_OWNERS_LEFT_GRID_MSG +
+                        m.entries().stream().map(e -> " [cacheName=" + 
e.cached().context().name() +
+                            ", partition=" + e.key().partition() +
+                            (S.includeSensitive() ? ", key=" + e.key() : "") +
+                            "]").findFirst().orElse("")));
+
+                    return true;
+                }
+            }
+
             if (nodeId.equals(m.primary().id())) {
                 if (msgLog.isDebugEnabled()) {
                     msgLog.debug("Near finish fut, mini future node left 
[txId=" + tx.nearXidVersion() +
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index cd28022..db64131 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -99,6 +99,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.GridTestUtils.SF;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -4106,10 +4107,14 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
                                             cacheOperation(rnd, cache);
                                     }
                                     catch (Exception e) {
-                                        if (X.hasCause(e, 
CacheStoppedException.class)) {
+                                        if (X.hasCause(e, 
CacheStoppedException.class) ||
+                                            (X.hasCause(e, 
CacheInvalidStateException.class) &&
+                                                X.hasCause(e, 
TransactionRollbackException.class))
+                                        ) {
                                             // Cache operation can be blocked 
on
                                             // awaiting new topology version 
and cancelled with CacheStoppedException cause.
-
+                                            // Cache operation can failed
+                                            // if a node was stopped during 
transaction.
                                             continue;
                                         }
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
new file mode 100644
index 0000000..f16c6ea
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxExceptionNodeFailTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.ShutdownPolicy;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionHeuristicException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.springframework.util.Assert;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture.ALL_PARTITION_OWNERS_LEFT_GRID_MSG;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
+
+/**
+ * Tests check a result of commit when a node fail before
+ * send {@link GridNearTxFinishResponse} to transaction coodinator
+ */
+@RunWith(Parameterized.class)
+public class IgniteTxExceptionNodeFailTest extends GridCommonAbstractTest {
+    /** Parameters. */
+    @Parameterized.Parameters(name = "syncMode={0}")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            { PRIMARY_SYNC },
+            { FULL_SYNC },
+        });
+    }
+
+    /** syncMode */
+    @Parameterized.Parameter()
+    public CacheWriteSynchronizationMode syncMode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataStorageConfiguration dsConfig = new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setMaxSize(100L * 1024 * 1024)
+                .setPersistenceEnabled(true));
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg
+            .setDataStorageConfiguration(dsConfig)
+            .setCacheConfiguration(new CacheConfiguration("cache")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setWriteSynchronizationMode(syncMode).setBackups(0));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * <ul>
+     * <li>Start 2 nodes with transactional cache, without backups, with 
{@link IgniteTxExceptionNodeFailTest#syncMode}
+     * <li>Start transaction:
+     *  <ul>
+     *  <li>put a key to a partition on transaction coordinator
+     *  <li>put a key to a partition on other node
+     *  <li>try to commit the transaction
+     *  </ul>
+     * <li>Stop other node when it try to send GridNearTxFinishResponse
+     * <li>Check that {@link Transaction#commit()} throw {@link 
TransactionHeuristicException}
+     * </ul>
+     *
+     * @throws Exception If failed
+     */
+    @Test
+    public void testNodeFailBeforeSendGridNearTxFinishResponse() throws 
Exception {
+        startGrids(2);
+
+        grid(0).cluster().active(true);
+
+        IgniteEx grid0 = grid(0);
+        IgniteEx grid1 = grid(1);
+
+        int key0 = 0;
+        int key1 = 0;
+
+        Affinity<Object> aff = grid1.affinity("cache");
+
+        for (int i = 1; i < 1000; i++) {
+            if (grid0.equals(grid(aff.mapKeyToNode(i)))) {
+                key0 = i;
+
+                break;
+            }
+        }
+
+        for (int i = key0; i < 1000; i++) {
+            if (grid1.equals(grid(aff.mapKeyToNode(i))) && 
!aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(i))) {
+                key1 = i;
+
+                break;
+            }
+        }
+
+        assert !aff.mapKeyToNode(key0).equals(aff.mapKeyToNode(key1));
+
+        try (Transaction tx = grid1.transactions().txStart()) {
+            grid1.cache("cache").put(key0, 100);
+            grid1.cache("cache").put(key1, 200);
+
+            spi(grid0).blockMessages((node, msg) -> {
+                    if (msg instanceof GridNearTxFinishResponse) {
+                        new Thread(
+                            new Runnable() {
+                                @Override public void run() {
+                                    log().info("Stopping node: [" + 
grid0.name() + "]");
+
+                                    IgnitionEx.stop(grid0.name(), true, 
ShutdownPolicy.IMMEDIATE, false);
+                                }
+                            },
+                            "node-stopper"
+                        ).start();
+
+                        return true;
+                    }
+
+                    return false;
+                }
+            );
+
+            boolean passed = false;
+
+            try {
+                tx.commit();
+            }
+            catch (Throwable e) {
+                String msg = e.getMessage();
+
+                Assert.isTrue(e.getCause() instanceof 
CacheInvalidStateException);
+
+                
Assert.isTrue(msg.contains(ALL_PARTITION_OWNERS_LEFT_GRID_MSG));
+
+                if (!mvccEnabled(grid1.context())) {
+                    Pattern msgPtrn = Pattern.compile(" \\[cacheName=cache, 
partition=\\d+, " + "key=KeyCacheObjectImpl \\[part=\\d+, val=" + key0 +
+                        ", hasValBytes=true\\]\\]");
+
+                    Matcher matcher = msgPtrn.matcher(msg);
+
+                    Assert.isTrue(matcher.find());
+                }
+
+                passed = true;
+            }
+
+            Assert.isTrue(passed);
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index ebe0b5a..1e4f59c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridCa
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PDisabledByteArrayValuesSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearPartitionedP2PEnabledByteArrayValuesSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridCachePutArrayValueSelfTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxExceptionNodeFailTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
@@ -152,6 +153,7 @@ public class IgniteCacheTestSuite3 {
 
         GridTestUtils.addTestIfNeeded(suite, 
IgniteTxReentryNearSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteTxReentryColocatedSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
IgniteTxExceptionNodeFailTest.class, ignoredTests);
 
         // Test for byte array value special case.
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheLocalByteArrayValuesSelfTest.class, ignoredTests);

Reply via email to