Repository: ignite
Updated Branches:
  refs/heads/ignite-3212 6c06bd822 -> fc5de1d46


ignite-3212 Fixed issue with message send failure and late discovery event.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc5de1d4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc5de1d4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc5de1d4

Branch: refs/heads/ignite-3212
Commit: fc5de1d461be7a562f75a25513abca03c105f66a
Parents: 6c06bd8
Author: sboikov <[email protected]>
Authored: Thu Jun 2 14:41:25 2016 +0300
Committer: sboikov <[email protected]>
Committed: Thu Jun 2 14:41:25 2016 +0300

----------------------------------------------------------------------
 .../distributed/GridCacheTxRecoveryFuture.java  |  12 +-
 .../cache/transactions/IgniteTxManager.java     | 163 ++++++++++---------
 2 files changed, 96 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc5de1d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index b1f1e19..e1d06b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -44,6 +44,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+
 /**
  * Future verifying that all remote transactions related to transaction were 
prepared or committed.
  */
@@ -531,11 +533,13 @@ public class GridCacheTxRecoveryFuture extends 
GridCompoundIdentityFuture<Boolea
                 log.debug("Transaction node left grid (will ignore) [fut=" + 
this + ']');
 
             if (nearTxCheck) {
-                Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds);
-                failedNodeIds0.add(nodeId);
+                if (tx.state() == PREPARED) {
+                    Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds);
+                    failedNodeIds0.add(nodeId);
 
-                // Near and originating nodes left, need initiate tx check.
-                cctx.tm().commitIfPrepared(tx, failedNodeIds0);
+                    // Near and originating nodes left, need initiate tx check.
+                    cctx.tm().commitIfPrepared(tx, failedNodeIds0);
+                }
 
                 onDone(new ClusterTopologyCheckedException("Transaction node 
left grid (will ignore)."));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc5de1d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4ec280f..4e7078e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1719,73 +1719,56 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
         @Nullable GridFutureAdapter<Boolean> fut,
         @Nullable Collection<GridCacheVersion> processedVers)
     {
-        for (final IgniteInternalTx tx : txs()) {
-            if (nearVer.equals(tx.nearXidVersion())) {
-                IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
-
-                if (prepFut != null && !prepFut.isDone()) {
-                    if (log.isDebugEnabled())
-                        log.debug("Transaction is preparing (will wait): " + 
tx);
+        long start = U.currentTimeMillis();
 
-                    final GridFutureAdapter<Boolean> fut0 = fut != null ? fut 
: new GridFutureAdapter<Boolean>();
+        try {
+            for (final IgniteInternalTx tx : txs()) {
+                if (nearVer.equals(tx.nearXidVersion())) {
+                    IgniteInternalFuture<?> prepFut = 
tx.currentPrepareFuture();
 
-                    final int txNum0 = txNum;
-
-                    final Collection<GridCacheVersion> processedVers0 = 
processedVers;
+                    if (prepFut != null && !prepFut.isDone()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Transaction is preparing (will wait): " 
+ tx);
 
-                    prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> 
prepFut) {
-                            if (log.isDebugEnabled())
-                                log.debug("Transaction prepare future 
finished: " + tx);
+                        final GridFutureAdapter<Boolean> fut0 = fut != null ? 
fut : new GridFutureAdapter<Boolean>();
 
-                            IgniteInternalFuture<Boolean> fut = 
txsPreparedOrCommitted(nearVer,
-                                txNum0,
-                                fut0,
-                                processedVers0);
+                        final int txNum0 = txNum;
 
-                            assert fut == fut0;
-                        }
-                    });
+                        final Collection<GridCacheVersion> processedVers0 = 
processedVers;
 
-                    return fut0;
-                }
+                        prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void 
apply(IgniteInternalFuture<?> prepFut) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Transaction prepare future 
finished: " + tx);
 
-                TransactionState state = tx.state();
+                                IgniteInternalFuture<Boolean> fut = 
txsPreparedOrCommitted(nearVer,
+                                    txNum0,
+                                    fut0,
+                                    processedVers0);
 
-                if (state == PREPARED || state == COMMITTING || state == 
COMMITTED) {
-                    if (--txNum == 0) {
-                        if (fut != null)
-                            fut.onDone(true);
+                                assert fut == fut0;
+                            }
+                        });
 
-                        return fut;
+                        return fut0;
                     }
-                }
-                else {
-                    if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) {
-                        tx.rollbackAsync();
 
-                        if (log.isDebugEnabled())
-                            log.debug("Transaction was not prepared (rolled 
back): " + tx);
-
-                        if (fut == null)
-                            fut = new GridFutureAdapter<>();
+                    TransactionState state = tx.state();
 
-                        fut.onDone(false);
+                    if (state == PREPARED || state == COMMITTING || state == 
COMMITTED) {
+                        if (--txNum == 0) {
+                            if (fut != null)
+                                fut.onDone(true);
 
-                        return fut;
+                            return fut;
+                        }
                     }
                     else {
-                        if (tx.state() == COMMITTED) {
-                            if (--txNum == 0) {
-                                if (fut != null)
-                                    fut.onDone(true);
+                        if (tx.state(MARKED_ROLLBACK) || tx.state() == 
UNKNOWN) {
+                            tx.rollbackAsync();
 
-                                return fut;
-                            }
-                        }
-                        else {
                             if (log.isDebugEnabled())
-                                log.debug("Transaction is not prepared: " + 
tx);
+                                log.debug("Transaction was not prepared 
(rolled back): " + tx);
 
                             if (fut == null)
                                 fut = new GridFutureAdapter<>();
@@ -1794,47 +1777,77 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
 
                             return fut;
                         }
+                        else {
+                            if (tx.state() == COMMITTED) {
+                                if (--txNum == 0) {
+                                    if (fut != null)
+                                        fut.onDone(true);
+
+                                    return fut;
+                                }
+                            }
+                            else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Transaction is not prepared: " 
+ tx);
+
+                                if (fut == null)
+                                    fut = new GridFutureAdapter<>();
+
+                                fut.onDone(false);
+
+                                return fut;
+                            }
+                        }
                     }
-                }
 
-                if (processedVers == null)
-                    processedVers = new HashSet<>(txNum, 1.0f);
+                    if (processedVers == null)
+                        processedVers = new HashSet<>(txNum, 1.0f);
 
-                processedVers.add(tx.xidVersion());
+                    processedVers.add(tx.xidVersion());
+                }
             }
-        }
 
-        // Not all transactions were found. Need to scan committed versions to 
check
-        // if transaction was already committed.
-        for (Map.Entry<GridCacheVersion, Boolean> e : 
completedVersHashMap.entrySet()) {
-            if (!e.getValue())
-                continue;
+            // Not all transactions were found. Need to scan committed 
versions to check
+            // if transaction was already committed.
+            for (Map.Entry<GridCacheVersion, Boolean> e : 
completedVersHashMap.entrySet()) {
+                if (!e.getValue())
+                    continue;
 
-            GridCacheVersion ver = e.getKey();
+                GridCacheVersion ver = e.getKey();
 
-            if (processedVers != null && processedVers.contains(ver))
-                continue;
+                if (processedVers != null && processedVers.contains(ver))
+                    continue;
 
-            if (ver instanceof CommittedVersion) {
-                CommittedVersion commitVer = (CommittedVersion)ver;
+                if (ver instanceof CommittedVersion) {
+                    CommittedVersion commitVer = (CommittedVersion)ver;
 
-                if (commitVer.nearVer.equals(nearVer)) {
-                    if (--txNum == 0) {
-                        if (fut != null)
-                            fut.onDone(true);
+                    if (commitVer.nearVer.equals(nearVer)) {
+                        if (--txNum == 0) {
+                            if (fut != null)
+                                fut.onDone(true);
 
-                        return fut;
+                            return fut;
+                        }
                     }
                 }
             }
-        }
 
-        if (fut == null)
-            fut = new GridFutureAdapter<>();
+            if (fut == null)
+                fut = new GridFutureAdapter<>();
 
-        fut.onDone(false);
+            fut.onDone(false);
 
-        return fut;
+            return fut;
+        }
+        finally {
+            long time = U.currentTimeMillis() - start;
+
+            if (time > 200) {
+                U.warn(log, "Slow txsPreparedOrCommitted [time=" + time +
+                    ", nearVer=" + nearVer +
+                    ", processedVers=" + processedVers + ']');
+            }
+        }
     }
 
     /**

Reply via email to