IGNITE-8820 Add ability to accept changing txTimeoutOnPartitionMapExchange 
while waiting for pending transactions. - Fixes #4217.

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/09ce06cc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/09ce06cc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/09ce06cc

Branch: refs/heads/ignite-8783
Commit: 09ce06cc02ff93dd3c255a130253637b34aa48f1
Parents: ddc41e2
Author: Ivan Daschinskiy <ivanda...@gmail.com>
Authored: Mon Jul 16 17:18:21 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Mon Jul 16 18:02:01 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  14 +-
 .../GridDhtPartitionsExchangeFuture.java        |  37 +++--
 .../optimized/OptimizedMarshallerTest.java      |   6 -
 .../SetTxTimeoutOnPartitionMapExchangeTest.java | 146 +++++++++++++++++++
 4 files changed, 171 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d3fddab..644c26e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -46,7 +46,6 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -2493,17 +2492,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                             int dumpCnt = 0;
 
-                            IgniteConfiguration cfg = cctx.gridConfig();
-
-                            long rollbackTimeout = 
cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
-
                             final long dumpTimeout = 2 * 
cctx.gridConfig().getNetworkTimeout();
 
                             long nextDumpTime = 0;
 
                             while (true) {
                                 try {
-                                    resVer = exchFut.get(rollbackTimeout > 0 ? 
rollbackTimeout : dumpTimeout);
+                                    resVer = exchFut.get(dumpTimeout);
 
                                     break;
                                 }
@@ -2512,7 +2507,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                                         U.warn(diagnosticLog, "Failed to wait 
for partition map exchange [" +
                                             "topVer=" + 
exchFut.initialVersion() +
                                             ", node=" + cctx.localNodeId() + 
"]. " +
-                                            (rollbackTimeout == 0 ? "Consider 
changing TransactionConfiguration.txTimeoutOnPartitionMapSynchronization to non 
default value to avoid this message. " : "") +
                                             "Dumping pending objects that 
might be the cause: ");
 
                                         try {
@@ -2524,12 +2518,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                                         nextDumpTime = U.currentTimeMillis() + 
nextDumpTimeout(dumpCnt++, dumpTimeout);
                                     }
-
-                                    if (rollbackTimeout > 0) {
-                                        rollbackTimeout = 0; // Try automatic 
rollback only once.
-
-                                        
cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
-                                    }
                                 }
                                 catch (Exception e) {
                                     if (exchFut.reconnectOnError(e))

http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index d44856f..75cd491 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1205,10 +1205,11 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
             distributed = false;
 
         // On first phase we wait for finishing all local tx updates, atomic 
updates and lock releases on all nodes.
-        waitPartitionRelease(distributed);
+        waitPartitionRelease(distributed, true);
 
         // Second phase is needed to wait for finishing all tx updates from 
primary to backup nodes remaining after first phase.
-        waitPartitionRelease(false);
+        if (distributed)
+            waitPartitionRelease(false, false);
 
         boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT 
|| affChangeMsg != null;
 
@@ -1319,10 +1320,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
      * {@link 
GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
      *
      * @param distributed If {@code true} then node should wait for partition 
release completion on all other nodes.
+     * @param doRollback If {@code true} tries to rollback transactions which 
lock partitions. Avoids unnecessary calls
+     *      of {@link 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager#rollbackOnTopologyChange}
      *
      * @throws IgniteCheckedException If failed.
      */
-    private void waitPartitionRelease(boolean distributed) throws 
IgniteCheckedException {
+    private void waitPartitionRelease(boolean distributed, boolean doRollback) 
throws IgniteCheckedException {
         Latch releaseLatch = null;
 
         // Wait for other nodes only on first phase.
@@ -1342,34 +1345,37 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         int dumpCnt = 0;
 
-        long waitStart = U.currentTimeMillis();
-
         long nextDumpTime = 0;
 
         IgniteConfiguration cfg = cctx.gridConfig();
 
-        boolean rollbackEnabled = 
cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange() > 0;
+        long waitStart = U.currentTimeMillis();
 
         long waitTimeout = 2 * cfg.getNetworkTimeout();
 
+        boolean txRolledBack = !doRollback;
+
         while (true) {
+            // Read txTimeoutOnPME from configuration after every iteration.
+            long curTimeout = 
cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
             try {
-                partReleaseFut.get(rollbackEnabled ?
-                    
cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange() :
-                    waitTimeout, TimeUnit.MILLISECONDS);
+                // This avoids unnessesary waiting for rollback.
+                partReleaseFut.get(curTimeout > 0 && !txRolledBack ?
+                        Math.min(curTimeout, waitTimeout) : waitTimeout, 
TimeUnit.MILLISECONDS);
 
                 break;
             }
             catch (IgniteFutureTimeoutCheckedException ignored) {
                 // Print pending transactions and locks that might have led to 
hang.
                 if (nextDumpTime <= U.currentTimeMillis()) {
-                    dumpPendingObjects(partReleaseFut);
+                    dumpPendingObjects(partReleaseFut, curTimeout <= 0 && 
!txRolledBack);
 
                     nextDumpTime = U.currentTimeMillis() + 
nextDumpTimeout(dumpCnt++, waitTimeout);
                 }
 
-                if (rollbackEnabled) {
-                    rollbackEnabled = false;
+                if (!txRolledBack && curTimeout > 0 && U.currentTimeMillis() - 
waitStart >= curTimeout) {
+                    txRolledBack = true;
 
                     cctx.tm().rollbackOnTopologyChange(initialVersion());
                 }
@@ -1478,12 +1484,17 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
     /**
      * @param partReleaseFut Partition release future.
+     * @param txTimeoutNotifyFlag If {@code true} print transaction rollback 
timeout on PME notification.
      */
-    private void dumpPendingObjects(IgniteInternalFuture<?> partReleaseFut) {
+    private void dumpPendingObjects(IgniteInternalFuture<?> partReleaseFut, 
boolean txTimeoutNotifyFlag) {
         U.warn(cctx.kernalContext().cluster().diagnosticLog(),
             "Failed to wait for partition release future [topVer=" + 
initialVersion() +
             ", node=" + cctx.localNodeId() + "]");
 
+        if (txTimeoutNotifyFlag)
+            U.warn(cctx.kernalContext().cluster().diagnosticLog(), "Consider 
changing TransactionConfiguration." +
+                    "txTimeoutOnPartitionMapExchange to non default value to 
avoid this message.");
+
         U.warn(log, "Partition release future: " + partReleaseFut);
 
         U.warn(cctx.kernalContext().cluster().diagnosticLog(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java
index 79496ae..a7e29c4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerTest.java
@@ -414,12 +414,6 @@ public class OptimizedMarshallerTest extends 
GridCommonAbstractTest {
         });
 
         allocationOverflowCheck(() -> {
-            marshaller().marshal(new int[1<<30]);
-            marshaller().marshal(new int[1<<30]);
-            return null;
-        });
-
-        allocationOverflowCheck(() -> {
             marshaller().marshal(new float[1<<29]);
             marshaller().marshal(new float[1<<29]);
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/09ce06cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
index 3152349..7033529 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/SetTxTimeoutOnPartitionMapExchangeTest.java
@@ -18,15 +18,25 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectName;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.TransactionsMXBeanImpl;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -36,6 +46,13 @@ import 
org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.internal.util.typedef.X.hasCause;
+import static 
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  *
@@ -120,6 +137,135 @@ public class SetTxTimeoutOnPartitionMapExchangeTest 
extends GridCommonAbstractTe
     }
 
     /**
+     * Tests applying new txTimeoutOnPartitionMapExchange while an exchange 
future runs.
+     *
+     * @throws Exception If fails.
+     */
+    public void testSetTxTimeoutDuringPartitionMapExchange() throws Exception {
+        IgniteEx ig = (IgniteEx)startGrids(2);
+
+        final long longTimeout = 600_000L;
+        final long shortTimeout = 5_000L;
+
+        TransactionsMXBean mxBean = txMXBean(0);
+
+        // Case 1: set very long txTimeoutOnPME, transaction should be rolled 
back.
+        mxBean.setTxTimeoutOnPartitionMapExchange(longTimeout);
+        assertTxTimeoutOnPartitionMapExchange(longTimeout);
+
+        AtomicReference<Exception> txEx = new AtomicReference<>();
+
+        IgniteInternalFuture<Long> fut = startDeadlock(ig, txEx, 0);
+
+        startGridAsync(2);
+
+        waitForExchangeStarted(ig);
+
+        mxBean.setTxTimeoutOnPartitionMapExchange(shortTimeout);
+
+        awaitPartitionMapExchange();
+
+        fut.get();
+
+        assertTrue("Transaction should be rolled back", hasCause(txEx.get(), 
TransactionRollbackException.class));
+
+        // Case 2: txTimeoutOnPME will be set to 0 after starting of PME, 
transaction should be cancelled on timeout.
+        mxBean.setTxTimeoutOnPartitionMapExchange(longTimeout);
+        assertTxTimeoutOnPartitionMapExchange(longTimeout);
+
+        fut = startDeadlock(ig, txEx, 10000L);
+
+        startGridAsync(3);
+
+        waitForExchangeStarted(ig);
+
+        mxBean.setTxTimeoutOnPartitionMapExchange(0);
+
+        fut.get();
+
+        assertTrue("Transaction should be canceled on timeout", 
hasCause(txEx.get(), TransactionTimeoutException.class));
+    }
+
+    /**
+     * Start test deadlock
+     *
+     * @param ig Ig.
+     * @param txEx Atomic reference to transaction exception.
+     * @param timeout Transaction timeout.
+     */
+    private IgniteInternalFuture<Long> startDeadlock(Ignite ig, 
AtomicReference<Exception> txEx, long timeout) {
+        IgniteCache<Object, Object> cache = ig.getOrCreateCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+
+        AtomicInteger thCnt = new AtomicInteger();
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() {
+                int thNum = thCnt.incrementAndGet();
+
+                try (Transaction tx = ig.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ, timeout, 0)) {
+                    cache.put(thNum, 1);
+
+                    barrier.await();
+
+                    cache.put(thNum % 2 + 1, 1);
+
+                    tx.commit();
+                }
+                catch (Exception e) {
+                    txEx.set(e);
+                }
+
+                return null;
+            }
+        }, 2, "tx-thread");
+    }
+
+    /**
+     * Starts grid asynchronously and returns just before grid starting.
+     * Avoids blocking on PME.
+     *
+     * @param idx Test grid index.
+     * @throws Exception If fails.
+     */
+    private void startGridAsync(int idx) throws Exception {
+        GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    startGrid(idx);
+                }
+                catch (Exception e) {
+                    // no-op.
+                }
+            }
+        });
+    }
+
+    /**
+     * Waits for srarting PME on grid.
+     *
+     * @param ig Ignite grid.
+     * @throws IgniteCheckedException If fails.
+     */
+    private void waitForExchangeStarted(IgniteEx ig) throws 
IgniteCheckedException {
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (GridDhtPartitionsExchangeFuture fut: 
ig.context().cache().context().exchange().exchangeFutures()) {
+                    if (!fut.isDone())
+                        return true;
+                }
+
+                return false;
+            }
+        }, WAIT_CONDITION_TIMEOUT);
+
+        // Additional waiting to ensure that code really start waiting for 
partition release.
+        U.sleep(5_000L);
+    }
+
+    /**
      *
      */
     private TransactionsMXBean txMXBean(int igniteInt) throws Exception {

Reply via email to