Repository: ignite Updated Branches: refs/heads/ignite-3212 6c06bd822 -> fc5de1d46
ignite-3212 Fixed issue with message send failure and late discovery event. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc5de1d4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc5de1d4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc5de1d4 Branch: refs/heads/ignite-3212 Commit: fc5de1d461be7a562f75a25513abca03c105f66a Parents: 6c06bd8 Author: sboikov <[email protected]> Authored: Thu Jun 2 14:41:25 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 2 14:41:25 2016 +0300 ---------------------------------------------------------------------- .../distributed/GridCacheTxRecoveryFuture.java | 12 +- .../cache/transactions/IgniteTxManager.java | 163 ++++++++++--------- 2 files changed, 96 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fc5de1d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index b1f1e19..e1d06b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -44,6 +44,8 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.transactions.TransactionState.PREPARED; + /** * Future verifying that all remote transactions related to transaction were prepared or committed. */ @@ -531,11 +533,13 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea log.debug("Transaction node left grid (will ignore) [fut=" + this + ']'); if (nearTxCheck) { - Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds); - failedNodeIds0.add(nodeId); + if (tx.state() == PREPARED) { + Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds); + failedNodeIds0.add(nodeId); - // Near and originating nodes left, need initiate tx check. - cctx.tm().commitIfPrepared(tx, failedNodeIds0); + // Near and originating nodes left, need initiate tx check. + cctx.tm().commitIfPrepared(tx, failedNodeIds0); + } onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore).")); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fc5de1d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4ec280f..4e7078e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1719,73 +1719,56 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Nullable GridFutureAdapter<Boolean> fut, @Nullable Collection<GridCacheVersion> processedVers) { - for (final IgniteInternalTx tx : txs()) { - if (nearVer.equals(tx.nearXidVersion())) { - IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); - - if (prepFut != null && !prepFut.isDone()) { - if (log.isDebugEnabled()) - log.debug("Transaction is preparing (will wait): " + tx); + long start = U.currentTimeMillis(); - final GridFutureAdapter<Boolean> fut0 = fut != null ? fut : new GridFutureAdapter<Boolean>(); + try { + for (final IgniteInternalTx tx : txs()) { + if (nearVer.equals(tx.nearXidVersion())) { + IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); - final int txNum0 = txNum; - - final Collection<GridCacheVersion> processedVers0 = processedVers; + if (prepFut != null && !prepFut.isDone()) { + if (log.isDebugEnabled()) + log.debug("Transaction is preparing (will wait): " + tx); - prepFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> prepFut) { - if (log.isDebugEnabled()) - log.debug("Transaction prepare future finished: " + tx); + final GridFutureAdapter<Boolean> fut0 = fut != null ? fut : new GridFutureAdapter<Boolean>(); - IgniteInternalFuture<Boolean> fut = txsPreparedOrCommitted(nearVer, - txNum0, - fut0, - processedVers0); + final int txNum0 = txNum; - assert fut == fut0; - } - }); + final Collection<GridCacheVersion> processedVers0 = processedVers; - return fut0; - } + prepFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> prepFut) { + if (log.isDebugEnabled()) + log.debug("Transaction prepare future finished: " + tx); - TransactionState state = tx.state(); + IgniteInternalFuture<Boolean> fut = txsPreparedOrCommitted(nearVer, + txNum0, + fut0, + processedVers0); - if (state == PREPARED || state == COMMITTING || state == COMMITTED) { - if (--txNum == 0) { - if (fut != null) - fut.onDone(true); + assert fut == fut0; + } + }); - return fut; + return fut0; } - } - else { - if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) { - tx.rollbackAsync(); - if (log.isDebugEnabled()) - log.debug("Transaction was not prepared (rolled back): " + tx); - - if (fut == null) - fut = new GridFutureAdapter<>(); + TransactionState state = tx.state(); - fut.onDone(false); + if (state == PREPARED || state == COMMITTING || state == COMMITTED) { + if (--txNum == 0) { + if (fut != null) + fut.onDone(true); - return fut; + return fut; + } } else { - if (tx.state() == COMMITTED) { - if (--txNum == 0) { - if (fut != null) - fut.onDone(true); + if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) { + tx.rollbackAsync(); - return fut; - } - } - else { if (log.isDebugEnabled()) - log.debug("Transaction is not prepared: " + tx); + log.debug("Transaction was not prepared (rolled back): " + tx); if (fut == null) fut = new GridFutureAdapter<>(); @@ -1794,47 +1777,77 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return fut; } + else { + if (tx.state() == COMMITTED) { + if (--txNum == 0) { + if (fut != null) + fut.onDone(true); + + return fut; + } + } + else { + if (log.isDebugEnabled()) + log.debug("Transaction is not prepared: " + tx); + + if (fut == null) + fut = new GridFutureAdapter<>(); + + fut.onDone(false); + + return fut; + } + } } - } - if (processedVers == null) - processedVers = new HashSet<>(txNum, 1.0f); + if (processedVers == null) + processedVers = new HashSet<>(txNum, 1.0f); - processedVers.add(tx.xidVersion()); + processedVers.add(tx.xidVersion()); + } } - } - // Not all transactions were found. Need to scan committed versions to check - // if transaction was already committed. - for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) { - if (!e.getValue()) - continue; + // Not all transactions were found. Need to scan committed versions to check + // if transaction was already committed. + for (Map.Entry<GridCacheVersion, Boolean> e : completedVersHashMap.entrySet()) { + if (!e.getValue()) + continue; - GridCacheVersion ver = e.getKey(); + GridCacheVersion ver = e.getKey(); - if (processedVers != null && processedVers.contains(ver)) - continue; + if (processedVers != null && processedVers.contains(ver)) + continue; - if (ver instanceof CommittedVersion) { - CommittedVersion commitVer = (CommittedVersion)ver; + if (ver instanceof CommittedVersion) { + CommittedVersion commitVer = (CommittedVersion)ver; - if (commitVer.nearVer.equals(nearVer)) { - if (--txNum == 0) { - if (fut != null) - fut.onDone(true); + if (commitVer.nearVer.equals(nearVer)) { + if (--txNum == 0) { + if (fut != null) + fut.onDone(true); - return fut; + return fut; + } } } } - } - if (fut == null) - fut = new GridFutureAdapter<>(); + if (fut == null) + fut = new GridFutureAdapter<>(); - fut.onDone(false); + fut.onDone(false); - return fut; + return fut; + } + finally { + long time = U.currentTimeMillis() - start; + + if (time > 200) { + U.warn(log, "Slow txsPreparedOrCommitted [time=" + time + + ", nearVer=" + nearVer + + ", processedVers=" + processedVers + ']'); + } + } } /**
