http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index cedfee5..e48e0d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -117,6 +117,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Context. */ private GridCacheSharedContext<?, ?> cctx; @@ -212,8 +215,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.nearMiniId = nearMiniId; - if (log == null) + if (log == null) { + msgLog = cctx.txPrepareMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtTxPrepareFuture.class); + } dhtMap = tx.dhtMap(); nearMap = tx.nearMap(); @@ -268,7 +273,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @return Transaction. */ - GridDhtTxLocalAdapter tx() { + public GridDhtTxLocalAdapter tx() { return tx; } @@ -463,13 +468,36 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) { if (!isDone()) { + boolean found = false; + MiniFuture mini = miniFuture(res.miniId()); if (mini != null) { + found = true; + assert mini.node().id().equals(nodeId); mini.onResult(res); } + + if (!found) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, response for finished future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -638,7 +666,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter sendPrepareResponse(res); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res, + ", tx=" + tx, + e); } } }; @@ -676,7 +709,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter sendPrepareResponse(res); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res, + ", tx=" + tx, + e); } } @@ -690,7 +728,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter sendPrepareResponse(res); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); + U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res, + ", tx=" + tx, + e); } finally { // Will call super.onDone(). @@ -725,10 +768,26 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (!tx.nearNodeId().equals(cctx.localNodeId())) { Throwable err = this.err.get(); - if (err != null && err instanceof IgniteFutureCancelledException) + if (err != null && err instanceof IgniteFutureCancelledException) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, skip send response [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", err=" + err + + ", res=" + res + ']'); + } + return; + } cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res + ']'); + } } } @@ -1178,13 +1237,34 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter try { cctx.io().send(n, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } } catch (ClusterTopologyCheckedException e) { fut.onNodeLeft(e); } catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + } } } @@ -1236,13 +1316,34 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter try { cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + ']'); + } } catch (ClusterTopologyCheckedException e) { fut.onNodeLeft(e); } catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + ']'); + } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.node().id() + + ", err=" + e + ']'); + } + } } } } @@ -1458,8 +1559,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param e Node failure. */ void onNodeLeft(ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, mini future node left [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + node().id() + ']'); + } if (tx != null) tx.removeMapping(nodeId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ad87add..6bb6a80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -37,6 +37,7 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -151,6 +152,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** */ private GridNearAtomicCache<K, V> near; + /** Logger. */ + private IgniteLogger msgLog; + /** * Empty constructor required by {@link Externalizable}. */ @@ -163,6 +167,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ public GridDhtAtomicCache(GridCacheContext<K, V> ctx) { super(ctx); + + msgLog = ctx.shared().atomicMessageLogger(); } /** @@ -171,6 +177,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ public GridDhtAtomicCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { super(ctx, map); + + msgLog = ctx.shared().atomicMessageLogger(); } /** {@inheritDoc} */ @@ -1428,7 +1436,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { - U.warn(log, "Node originated update request left grid: " + nodeId); + U.warn(msgLog, "Skip near update request, node originated update request left [" + + "futId=" + req.futureVersion() + ", node=" + nodeId + ']'); return; } @@ -1443,14 +1452,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (hasNear) res.nearVersion(ver); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Assigned update version [futId=" + req.futureVersion() + + ", writeVer=" + ver + ']'); + } } assert ver != null : "Got null version for update request: " + req; - if (log.isDebugEnabled()) - log.debug("Using cache version for update request on primary node [ver=" + ver + - ", req=" + req + ']'); - boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); dhtFut = createDhtFuture(ver, req, res, completionCb, false); @@ -2776,8 +2786,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Near atomic update request. */ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + nodeId + ']'); + } req.nodeId(ctx.localNodeId()); @@ -2790,8 +2803,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); + if (msgLog.isDebugEnabled()) + msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']'); res.nodeId(ctx.localNodeId()); @@ -2799,9 +2812,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (fut != null) fut.onResult(nodeId, res); - else - U.warn(log, "Failed to find near update future for update response (will ignore) " + - "[nodeId=" + nodeId + ", res=" + res + ']'); + else { + U.warn(msgLog, "Failed to find near update future for update response (will ignore) " + + "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']'); + } } /** @@ -2809,8 +2823,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Dht atomic update request. */ private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateRequest req) { - if (log.isDebugEnabled()) - log.debug("Processing dht atomic update request [nodeId=" + nodeId + ", req=" + req + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } GridCacheVersion ver = req.writeVersion(); @@ -2908,20 +2924,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); try { - if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) + if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { ctx.io().send(nodeId, res, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } + } else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() + + ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); + } + // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. sendDeferredUpdateResponse(nodeId, req.futureVersion()); } } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + - req.nodeId()); + U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() + + ", node=" + req.nodeId() + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + - ", req=" + req + ']', e); + U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() + + ", node=" + nodeId + ", res=" + res + ']', e); } } @@ -2960,16 +2987,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); - if (updateFut != null) + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() + + ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']'); + } + updateFut.onResult(nodeId, res); - else - U.warn(log, "Failed to find DHT update future for update response [nodeId=" + nodeId + - ", res=" + res + ']'); + } + else { + U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() + + ", node=" + nodeId + ", res=" + res + ']'); + } } /** @@ -2978,17 +3009,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - for (GridCacheVersion ver : res.futureVersions()) { GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); - if (updateFut != null) + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received DHT atomic deferred update response [futId=" + ver + + ", writeVer=" + res + ", node=" + nodeId + ']'); + } + updateFut.onResult(nodeId); - else - U.warn(log, "Failed to find DHT update future for deferred update response [nodeId=" + - nodeId + ", ver=" + ver + ", res=" + res + ']'); + } + else { + U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver + + ", nodeId=" + nodeId + ", res=" + res + ']'); + } } } @@ -2999,14 +3034,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { try { ctx.io().send(nodeId, res, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) + msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']'); } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send near update reply to node because it left grid: " + - nodeId); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() + + ", node=" + nodeId + ']'); + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send near update reply (did node leave grid?) [nodeId=" + nodeId + - ", res=" + res + ']', e); + U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() + + ", node=" + nodeId + ", res=" + res + ']', e); } } @@ -3276,24 +3316,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { ctx.io().send(nodeId, msg, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() + + ", node=" + nodeId + ']'); + } } finally { ctx.kernalContext().gateway().readUnlock(); } } catch (IllegalStateException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send deferred dht update response to remote node (grid is stopping) " + - "[nodeId=" + nodeId + ", msg=" + msg + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node is stopping [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to send deferred dht update response to remote node (did node leave grid?) " + - "[nodeId=" + nodeId + ", msg=" + msg + ']'); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node left [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); + } } catch (IgniteCheckedException e) { - U.error(log, "Failed to send deferred dht update response to remote node [nodeId=" - + nodeId + ", msg=" + msg + ']', e); + U.error(log, "Failed to send deferred DHT update response to remote node [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e); } pendingResponses.remove(nodeId, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index 3a7bf1c..923b220 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -83,6 +85,11 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index df44455..2e44317 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -63,7 +63,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Logger. */ - protected static IgniteLogger log; + private static IgniteLogger log; + + /** Logger. */ + private static IgniteLogger msgLog; /** Cache context. */ private final GridCacheContext cctx; @@ -115,8 +118,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> */ public GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2<GridNearAtomicUpdateRequest, - GridNearAtomicUpdateResponse> completionCb, + CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, GridCacheVersion writeVer, GridNearAtomicUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes @@ -129,8 +131,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> this.completionCb = completionCb; this.updateRes = updateRes; - if (log == null) + if (log == null) { + msgLog = cctx.shared().atomicMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); + } keys = new ArrayList<>(updateReq.keys().size()); mappings = U.newHashMap(updateReq.keys().size()); @@ -140,6 +144,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !topLocked; } + /** + * @return Write version. + */ + GridCacheVersion writeVersion() { + return writeVer; + } + /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futVer.asGridUuid(); @@ -152,10 +163,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - if (log.isDebugEnabled()) - log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); + boolean res = registerResponse(nodeId); + + if (res && msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer + + ", node=" + nodeId + ']'); + } - return registerResponse(nodeId); + return res; } /** @@ -388,20 +403,24 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (!mappings.isEmpty()) { for (GridDhtAtomicUpdateRequest req : mappings.values()) { try { - if (log.isDebugEnabled()) - log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, sent request [futId=" + futVer + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); + } } catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send update request to backup node because it left grid: " + - req.nodeId()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); + } registerResponse(req.nodeId()); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send update request to backup node (did node leave the grid?): " - + req.nodeId(), e); + U.error(msgLog, "Failed to send request [futId=" + futVer + + ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']'); registerResponse(req.nodeId()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index ed50f41..3901dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; @@ -703,6 +704,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 1c7b5f0..742b39c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -188,6 +189,11 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index c9e1a11..c125514 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -75,7 +75,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Logger. */ - protected static IgniteLogger log; + private static IgniteLogger log; + + /** Logger. */ + private static IgniteLogger msgLog; /** Cache context. */ private final GridCacheContext cctx; @@ -216,8 +219,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> this.keepBinary = keepBinary; this.waitTopFut = waitTopFut; - if (log == null) + if (log == null) { + msgLog = cctx.shared().atomicMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + } fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && cctx.config().getAtomicWriteOrderMode() == CLOCK && @@ -463,15 +468,25 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } else { try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + req.nodeId() + ']'); + } + if (syncMode == FULL_ASYNC) onDone(new GridCacheReturn(cctx, true, true, null, true)); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + req.nodeId() + + ", err=" + e + ']'); + } + state.onSendError(req, e); } } @@ -497,12 +512,22 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } else { try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + req.nodeId() + ']'); + } } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + req.nodeId() + + ", err=" + e + ']'); + } + state.onSendError(req, e); } } @@ -572,9 +597,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> void onNodeLeft(UUID nodeId) { GridNearAtomicUpdateResponse res = null; - synchronized (this) { - GridNearAtomicUpdateRequest req; + GridNearAtomicUpdateRequest req; + synchronized (this) { if (singleReq != null) req = singleReq.nodeId().equals(nodeId) ? singleReq : null; else @@ -595,8 +620,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } } - if (res != null) + if (res != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, node left [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + nodeId + ']'); + } + onResult(nodeId, res, true); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index c756d9a..1d03dd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -26,6 +26,7 @@ import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; @@ -642,6 +643,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 8c55b85..dd52aae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -435,6 +436,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index e8e8298..70ee6e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -1082,6 +1082,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param res Response. */ private void processLockResponse(UUID nodeId, GridNearLockResponse res) { + if (txLockMsgLog.isDebugEnabled()) + txLockMsgLog.debug("Received near lock response [txId=" + res.version() + ", node=" + nodeId + ']'); + assert nodeId != null; assert res != null; @@ -1090,6 +1093,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (fut != null) fut.onResult(nodeId, res); + else { + if (txLockMsgLog.isDebugEnabled()) { + txLockMsgLog.debug("Received near lock response for unknown future [txId=" + res.version() + + ", node=" + nodeId + + ", res=" + res + ']'); + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index dc55eb5..577f0f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -92,6 +92,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Logger. */ private static IgniteLogger log; + /** Logger. */ + private static IgniteLogger msgLog; + /** Cache registry. */ @GridToStringExclude private final GridCacheContext<?, ?> cctx; @@ -198,8 +201,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture futId = IgniteUuid.randomUuid(); - if (log == null) + if (log == null) { + msgLog = cctx.shared().txLockMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtColocatedLockFuture.class); + } if (timeout > 0) { timeoutObj = new LockTimeoutObject(); @@ -403,10 +408,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture MiniFuture f = (MiniFuture)fut; if (f.node().id().equals(nodeId)) { - if (log.isDebugEnabled()) - log.debug("Found mini-future for left node [nodeId=" + nodeId + ", mini=" + f + ", fut=" + - this + ']'); - f.onResult(newTopologyException(null, nodeId)); found = true; @@ -427,33 +428,29 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture */ void onResult(UUID nodeId, GridNearLockResponse res) { if (!isDone()) { - if (log.isDebugEnabled()) - log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + - this + ']'); - MiniFuture mini = miniFuture(res.miniId()); if (mini != null) { assert mini.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); - mini.onResult(res); - if (log.isDebugEnabled()) - log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini + - ", res=" + res + ']'); - return; } - U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + + U.warn(msgLog, "Collocated lock fut, failed to find mini future [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + nodeId + + ", res=" + res + ", fut=" + this + ']'); } - else if (log.isDebugEnabled()) - log.debug("Ignoring lock response from node (future is done) [nodeId=" + nodeId + ", res=" + res + - ", fut=" + this + ']'); + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, response for finished future [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + nodeId + ']'); + } + } } /** @@ -1059,10 +1056,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (txSync == null || txSync.isDone()) { try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, sent request [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + ']'); + } } catch (ClusterTopologyCheckedException ex) { assert fut != null; @@ -1074,10 +1074,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture txSync.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { try { - if (log.isDebugEnabled()) - log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']'); - cctx.io().send(node, req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, sent request [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + ']'); + } } catch (ClusterTopologyCheckedException ex) { assert fut != null; @@ -1085,6 +1088,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture fut.onResult(ex); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, failed to send request [txId=" + lockVer + + ", inTx=" + inTx() + + ", node=" + node.id() + + ", err=" + e + ']'); + } + onError(e); } } @@ -1421,6 +1431,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @param e Node left exception. */ void onResult(ClusterTopologyCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Collocated lock fut, mini future node left [txId=" + lockVer + + ", inTx=" + inTx() + + ", nodeId=" + node.id() + ']'); + } + if (isDone()) return; @@ -1431,9 +1447,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture rcvRes = true; } - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); - if (tx != null) tx.removeMapping(node.id()); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index be346b4..dc006c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -75,6 +75,7 @@ import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -87,7 +88,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { /** */ - private static final int DUMP_PENDING_OBJECTS_THRESHOLD = + public static final int DUMP_PENDING_OBJECTS_THRESHOLD = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10); /** */ @@ -786,7 +787,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT catch (IgniteFutureTimeoutCheckedException ignored) { // Print pending transactions and locks that might have led to hang. if (dumpedObjects < DUMP_PENDING_OBJECTS_THRESHOLD) { - dumpPendingObjects(); + U.warn(log, "Failed to wait for partition release future [topVer=" + topologyVersion() + + ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: "); + + try { + cctx.exchange().dumpDebugInfo(); + } + catch (Exception e) { + U.error(log, "Failed to dump debug information: " + e, e); + } + + if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) + U.dumpThreads(log); dumpedObjects++; } @@ -923,16 +935,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * - */ - private void dumpPendingObjects() { - U.warn(log, "Failed to wait for partition release future [topVer=" + topologyVersion() + - ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: "); - - cctx.exchange().dumpDebugInfo(); - } - - /** * @param cacheId Cache ID to check. * @return {@code True} if cache is stopping by this exchange. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 818c998..cf48889 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -160,6 +160,22 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa mini.onResult(nodeId, res); } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -480,6 +496,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa else { try { cctx.io().send(n, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + ']'); + } } catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); @@ -487,6 +508,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa fut.onResult(e); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + fut.onResult(e); } } @@ -687,6 +714,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param e Node failure. */ void onResult(ClusterTopologyCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, mini future node left [txId=" + tx.nearXidVersion() + + ", node=" + m.node().id() + ']'); + } + if (isDone()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 104af94..4fcfa66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -100,11 +100,24 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (f != null) { assert f.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f); - f.onResult(res); } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } } } @@ -258,6 +271,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else { try { cctx.io().send(node, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, sent request [txId=" + tx.nearXidVersion() + + ", node=" + node.id() + ']'); + } } catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(topVer)); @@ -265,6 +283,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA fut.onNodeLeft(e); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, failed send request [txId=" + tx.nearXidVersion() + + ", node=" + node.id() + ", err=" + e + ']'); + } + fut.onError(e); } } @@ -363,6 +386,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA * @param e Error. */ void onNodeLeft(ClusterTopologyCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near pessimistic prepare, mini future node left [txId=" + tx.nearXidVersion() + + ", nodeId=" + m.node().id() + ']'); + } + if (tx.onePhaseCommit()) { tx.markForBackupCheck(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 291c88a..400d23d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -72,6 +72,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** Logger. */ private static IgniteLogger log; + /** Logger. */ + protected static IgniteLogger msgLog; + /** Context. */ private GridCacheSharedContext<K, V> cctx; @@ -112,8 +115,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu futId = IgniteUuid.randomUuid(); - if (log == null) + if (log == null) { + msgLog = cctx.txFinishMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridNearTxFinishFuture.class); + } } /** {@inheritDoc} */ @@ -140,6 +145,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu return false; } + /** + * @return Transaction. + */ + public GridNearTxLocal tx() { + return tx; + } + /** {@inheritDoc} */ @Override public boolean trackable() { return trackable; @@ -157,18 +169,38 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param res Result. */ public void onResult(UUID nodeId, GridNearTxFinishResponse res) { - if (!isDone()) + if (!isDone()) { + boolean found = false; + for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture f = (MiniFuture) fut; if (f.futureId().equals(res.miniId())) { + found = true; + assert f.node().id().equals(nodeId); f.onResult(res); } } } + + if (!found && msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } } /** @@ -176,18 +208,38 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param res Result. */ public void onResult(UUID nodeId, GridDhtTxFinishResponse res) { - if (!isDone()) + if (!isDone()) { + boolean found = false; + for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) { if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture f = (MiniFuture) fut; if (f.futureId().equals(res.miniId())) { + found = true; + assert f.node().id().equals(nodeId); f.onResult(res); } } } + + if (!found && msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to find mini future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, response for finished future [txId=" + tx.nearXidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } + } } /** {@inheritDoc} */ @@ -455,8 +507,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu finishReq.checkCommitted(true); try { - if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) + if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0) { cctx.io().send(backup, finishReq, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, sent check committed request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + backup.id() + ']'); + } + } else mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " + "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() + @@ -466,6 +525,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu mini.onResult(e); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to send check committed request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + backup.id() + + ", err=" + e + ']'); + } + mini.onResult(e); } } @@ -596,6 +662,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu try { cctx.io().send(n, req, tx.ioPolicy()); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, sent request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + n.id() + ']'); + } + // If we don't wait for result, then mark future as done. if (!isSync() && !m.explicitLock()) fut.onDone(); @@ -607,6 +679,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu fut.onResult(e); } catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to send request [" + + "txId=" + tx.nearXidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } + // Fail the whole thing. fut.onResult(e); } @@ -705,8 +784,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param e Node failure. */ void onResult(ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this); + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, mini future node left [txId=" + tx.nearXidVersion() + + ", node=" + m.node().id() + ']'); + } if (backup != null) { readyNearMappingFromBackup(m); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ed13e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index 6992aa5..f46af0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -69,6 +69,9 @@ public abstract class GridNearTxPrepareFutureAdapter extends /** Logger. */ protected static IgniteLogger log; + /** Logger. */ + protected static IgniteLogger msgLog; + /** Context. */ protected GridCacheSharedContext<?, ?> cctx; @@ -105,8 +108,10 @@ public abstract class GridNearTxPrepareFutureAdapter extends futId = IgniteUuid.randomUuid(); - if (log == null) + if (log == null) { + msgLog = cctx.txFinishMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridNearTxPrepareFutureAdapter.class); + } } /** {@inheritDoc} */