This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 31506f0  IGNITE-12392 Faster transaction rolled back when one of 
backup node failed - Fixes #7072.
31506f0 is described below

commit 31506f0600c433e8873d3f44590e0c34f91a9122
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Thu Nov 28 21:15:08 2019 +0300

    IGNITE-12392 Faster transaction rolled back when one of backup node failed 
- Fixes #7072.
    
    Signed-off-by: Ivan Rakov <ira...@apache.org>
---
 .../distributed/dht/GridDhtTxFinishFuture.java     |  27 +-
 .../GridCacheFastNodeLeftForTransactionTest.java   | 394 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite2.java   |   2 +
 3 files changed, 414 insertions(+), 9 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index dc29ebe..00d1588 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 
+import static java.util.Objects.isNull;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
@@ -467,18 +468,26 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCacheCompoundIdentity
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : 
tx.xidVersion());
 
             try {
-                cctx.io().send(n, req, tx.ioPolicy());
+                if (isNull(cctx.discovery().getAlive(n.id()))) {
+                    log.error("Unable to send message (node left topology): " 
+ n);
 
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("DHT finish fut, sent request dht [txId=" + 
tx.nearXidVersion() +
-                        ", dhtTxId=" + tx.xidVersion() +
-                        ", node=" + n.id() + ']');
+                    fut.onNodeLeft(new ClusterTopologyCheckedException("Node 
left grid while sending message to: "
+                        + n.id()));
                 }
+                else {
+                    cctx.io().send(n, req, tx.ioPolicy());
 
-                if (sync)
-                    res = true;
-                else
-                    fut.onDone();
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT finish fut, sent request dht [txId=" 
+ tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() + ']');
+                    }
+
+                    if (sync)
+                        res = true;
+                    else
+                        fut.onDone();
+                }
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheFastNodeLeftForTransactionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheFastNodeLeftForTransactionTest.java
new file mode 100644
index 0000000..6bccbd2
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/local/GridCacheFastNodeLeftForTransactionTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.local;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+import java.util.logging.StreamHandler;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.commandline.CommandHandler;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
+import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.GridAbstractTest;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Arrays.stream;
+import static java.util.Objects.nonNull;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.IntStream.range;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.commandline.CommandHandler.initLogger;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.apache.ignite.testframework.LogListener.matches;
+
+/**
+ * Class for testing fast node left during transaction for cache.
+ */
+public class GridCacheFastNodeLeftForTransactionTest extends 
GridCommonAbstractTest {
+    /** Number of nodes. */
+    private static final int NODES = 4;
+
+    /** Number of transactions. */
+    private static final int TX_COUNT = 20;
+
+    /** Logger for listen log messages. */
+    private static ListeningTestLogger listeningLog;
+
+    /** Creating a client node. */
+    private boolean clientNode;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        listeningLog = new ListeningTestLogger(false, GridAbstractTest.log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        /*To listen the logs of future in current tests, since the log in the
+        futures is static and is not reset when tests are launched.*/
+        setFieldValue(GridDhtTxFinishFuture.class, "log", null);
+        
((AtomicReference<IgniteLogger>)getFieldValue(GridDhtTxFinishFuture.class, 
"logRef")).set(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        listeningLog.clearListeners();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setClientMode(clientNode)
+            .setCacheConfiguration(createCacheConfigs())
+            .setGridLogger(listeningLog)
+            .setConnectorConfiguration(new ConnectorConfiguration());
+    }
+
+    /**
+     * Test transaction rollback when one of the nodes drops out.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRollbackTransactions() throws Exception {
+        int txCnt = TX_COUNT;
+
+        int nodes = NODES;
+
+        IgniteEx crd = createCluster(nodes);
+
+        for (CacheConfiguration cacheConfig : createCacheConfigs()) {
+            String cacheName = cacheConfig.getName();
+
+            IgniteCache<Object, Object> cache = crd.cache(cacheName);
+
+            List<Integer> keys = primaryKeys(cache, txCnt);
+
+            Map<Integer, Integer> cacheValues = range(0, txCnt / 
2).boxed().collect(toMap(keys::get, identity()));
+
+            cache.putAll(cacheValues);
+
+            Collection<Transaction> txs = createTxs(
+                grid(nodes),
+                cacheName,
+                range(txCnt / 2, txCnt).mapToObj(keys::get).collect(toList())
+            );
+
+            int stoppedNodeId = 2;
+
+            stopGrid(stoppedNodeId);
+
+            LogListener logLsnr = newLogListener();
+
+            listeningLog.registerListener(logLsnr);
+
+            for (Transaction tx : txs)
+                tx.rollback();
+
+            awaitPartitionMapExchange();
+
+            check(cacheValues, cacheName, logLsnr, stoppedNodeId);
+        }
+    }
+
+    /**
+     * Test for rollback transactions when one of the nodes drops out,
+     * with operations performed on keys outside the transaction.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRollbackTransactionsWithKeyOperationOutsideThem() throws 
Exception {
+        int txCnt = TX_COUNT;
+
+        int nodes = NODES;
+
+        IgniteEx crd = createCluster(nodes);
+
+        for (CacheConfiguration cacheConfig : createCacheConfigs()) {
+            String cacheName = cacheConfig.getName();
+
+            IgniteCache<Object, Object> cache = crd.cache(cacheName);
+
+            List<Integer> keys = primaryKeys(cache, txCnt);
+
+            Map<Integer, Integer> cacheValues = range(0, txCnt / 
2).boxed().collect(toMap(keys::get, identity()));
+
+            cache.putAll(cacheValues);
+
+            List<Integer> txKeys = range(txCnt / 2, 
txCnt).mapToObj(keys::get).collect(toList());
+
+            IgniteEx clientNode = grid(nodes);
+
+            Collection<Transaction> txs = createTxs(clientNode, cacheName, 
txKeys);
+
+            int stoppedNodeId = 2;
+
+            stopGrid(stoppedNodeId);
+
+            CountDownLatch latch = new CountDownLatch(1);
+
+            GridTestUtils.runAsync(() -> {
+                latch.countDown();
+
+                IgniteCache<Object, Object> clientCache = 
clientNode.cache(DEFAULT_CACHE_NAME);
+
+                txKeys.forEach(clientCache::get);
+            });
+
+            LogListener logLsnr = newLogListener();
+
+            listeningLog.registerListener(logLsnr);
+
+            latch.await();
+
+            for (Transaction tx : txs)
+                tx.rollback();
+
+            awaitPartitionMapExchange();
+
+            check(cacheValues, cacheName, logLsnr, stoppedNodeId);
+        }
+    }
+
+    /**
+     * Checking the contents of the cache after rollback transactions,
+     * with restarting the stopped node with using "idle_verify".
+     *
+     * @param cacheValues Expected cache contents.
+     * @param cacheName Cache name.
+     * @param logLsnr LogListener.
+     * @param stoppedNodeId ID of the stopped node.
+     * @throws Exception If failed.
+     */
+    private void check(
+        Map<Integer, Integer> cacheValues,
+        String cacheName,
+        LogListener logLsnr,
+        int stoppedNodeId
+    ) throws Exception {
+        assert nonNull(cacheValues);
+        assert nonNull(cacheName);
+        assert nonNull(logLsnr);
+
+        checkCacheData(cacheValues, cacheName);
+
+        assertTrue(logLsnr.check());
+
+        clientNode = false;
+
+        startGrid(stoppedNodeId);
+
+        awaitPartitionMapExchange();
+
+        checkCacheData(cacheValues, cacheName);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        PrintStream sysOut = System.out;
+
+        try(PrintStream out = new PrintStream(baos)) {
+            System.setOut(out);
+
+            Logger cmdLog = createTestLogger(baos);
+            CommandHandler cmdHnd = new CommandHandler(cmdLog);
+
+            cmdHnd.execute(asList("--cache", "idle_verify"));
+
+            stream(cmdLog.getHandlers()).forEach(Handler::flush);
+
+            assertContains(listeningLog, baos.toString(), "no conflicts have 
been found");
+        }
+        finally {
+            System.setOut(sysOut);
+        }
+    }
+
+    /**
+     * Creating a logger for a CommandHandler.
+     *
+     * @param outputStream Stream for recording the result of a command.
+     * @return Logger.
+     */
+    private Logger createTestLogger(OutputStream outputStream) {
+        assert nonNull(outputStream);
+
+        Logger log = initLogger(null);
+
+        log.addHandler(new StreamHandler(outputStream, new Formatter() {
+            @Override public String format(LogRecord record) {
+                return record.getMessage() + "\n";
+            }
+        }));
+
+        return log;
+    }
+
+    /**
+     * Creating a cluster.
+     *
+     * @param nodes Number of server nodes, plus one client.
+     * @throws Exception If failed.
+     */
+    private IgniteEx createCluster(int nodes) throws Exception {
+        clientNode = false;
+
+        IgniteEx crd = startGrids(nodes);
+
+        clientNode = true;
+
+        startGrid(nodes);
+
+        awaitPartitionMapExchange();
+
+        return crd;
+    }
+
+    /**
+     * Transaction creation.
+     *
+     * @param node Node.
+     * @param cacheName Cache name.
+     * @param keys Keys.
+     * @return Transactions.
+     * @throws Exception If failed.
+     */
+    private Collection<Transaction> createTxs(
+        IgniteEx node,
+        String cacheName,
+        Collection<Integer> keys
+    ) throws Exception {
+        assert nonNull(node);
+        assert nonNull(cacheName);
+        assert nonNull(keys);
+
+        IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+        Collection<Transaction> txs = new ArrayList<>();
+
+        for (Integer key : keys) {
+            Transaction tx = node.transactions().txStart();
+
+            cache.put(key, key + 10);
+
+            ((TransactionProxyImpl)tx).tx().prepare(true);
+
+            txs.add(tx);
+        }
+
+        return txs;
+    }
+
+    /**
+     * Creating an instance of LogListener to find an exception
+     * "Unable to send message (node left topology):".
+     *
+     * @return LogListener.
+     */
+    private LogListener newLogListener() {
+        return matches("Unable to send message (node left topology):").build();
+    }
+
+    /**
+     * Creating a cache configurations.
+     *
+     * @return Cache configurations.
+     */
+    private CacheConfiguration[] createCacheConfigs() {
+        return new CacheConfiguration[] {
+            createCacheConfig(DEFAULT_CACHE_NAME + "_0", FULL_SYNC),
+            createCacheConfig(DEFAULT_CACHE_NAME + "_1", PRIMARY_SYNC)
+        };
+    }
+
+    /**
+     * Creating a cache configuration.
+     *
+     * @param cacheName Cache name.
+     * @param syncMode Sync mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration createCacheConfig(String cacheName, 
CacheWriteSynchronizationMode syncMode) {
+        assert nonNull(cacheName);
+        assert nonNull(syncMode);
+
+        return new CacheConfiguration(cacheName)
+            .setAtomicityMode(TRANSACTIONAL)
+            .setBackups(2)
+            .setAffinity(new RendezvousAffinityFunction(false, 10))
+            .setWriteSynchronizationMode(syncMode);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 799ff80..ee42927 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -162,6 +162,7 @@ import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalIsolatedN
 import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalLoadAllSelfTest;
 import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalLockSelfTest;
 import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalMultithreadedSelfTest;
+import 
org.apache.ignite.internal.processors.cache.local.GridCacheFastNodeLeftForTransactionTest;
 import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxMultiThreadedSelfTest;
 import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxReadTest;
 import 
org.apache.ignite.internal.processors.cache.local.GridCacheLocalTxSingleThreadedSelfTest;
@@ -209,6 +210,7 @@ public class IgniteCacheTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheLocalEvictionEventSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheLocalTxMultiThreadedSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheLocalIsolatedNodesSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
GridCacheFastNodeLeftForTransactionTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheTransformEventSelfTest.class, ignoredTests);
 

Reply via email to