ignite-3116 Cancel force keys futures on node stop
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04280189 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04280189 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04280189 Branch: refs/heads/ignite-3163 Commit: 04280189433bde02144ee0f831ed775f8c0980b2 Parents: 9f2bafb Author: sboikov <[email protected]> Authored: Mon May 16 09:08:14 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon May 16 09:08:14 2016 +0300 ---------------------------------------------------------------------- .../cache/distributed/dht/GridDhtGetFuture.java | 126 +++++++++---------- .../dht/GridDhtTransactionalCacheAdapter.java | 57 ++++++++- .../dht/atomic/GridDhtAtomicCache.java | 51 +++++++- .../dht/preloader/GridDhtForceKeysFuture.java | 6 +- .../dht/preloader/GridDhtPreloader.java | 28 ++++- .../communication/tcp/TcpCommunicationSpi.java | 31 ++--- .../IgniteCacheBinaryObjectsScanSelfTest.java | 2 + .../distributed/IgniteCacheCreatePutTest.java | 11 +- 8 files changed, 225 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index e86c885..4d4c303 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -43,6 +44,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.typedef.C2; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -166,9 +168,67 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * Initializes future. */ void init() { - map(keys); + GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); + + if (!F.isEmpty(fut.invalidPartitions())) { + if (retries == null) + retries = new HashSet<>(); + + retries.addAll(fut.invalidPartitions()); + } + + fut.listen(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> fut) { + try { + fut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); + + onDone(e); + + return; + } + + Map<KeyCacheObject, Boolean> mappedKeys = null; + + // Assign keys to primary nodes. + for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { + int part = cctx.affinity().partition(key.getKey()); - markInitialized(); + if (retries == null || !retries.contains(part)) { + if (!map(key.getKey(), parts)) { + if (retries == null) + retries = new HashSet<>(); + + retries.add(part); + + if (mappedKeys == null) { + mappedKeys = U.newLinkedHashMap(keys.size()); + + for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) { + if (key1.getKey() == key.getKey()) + break; + + mappedKeys.put(key.getKey(), key1.getValue()); + } + } + } + else if (mappedKeys != null) + mappedKeys.put(key.getKey(), key.getValue()); + } + } + + // Add new future. + add(getAsync(mappedKeys == null ? keys : mappedKeys)); + + markInitialized(); + } + }); } /** {@inheritDoc} */ @@ -204,68 +264,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } /** - * @param keys Keys. - */ - private void map(final Map<KeyCacheObject, Boolean> keys) { - GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer); - - if (!F.isEmpty(fut.invalidPartitions())) { - if (retries == null) - retries = new HashSet<>(); - - retries.addAll(fut.invalidPartitions()); - } - - add(new GridEmbeddedFuture<>( - new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) { - if (e != null) { // Check error first. - if (log.isDebugEnabled()) - log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']'); - - onDone(e); - } - - Map<KeyCacheObject, Boolean> mappedKeys = null; - - // Assign keys to primary nodes. - for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) { - int part = cctx.affinity().partition(key.getKey()); - - if (retries == null || !retries.contains(part)) { - if (!map(key.getKey(), parts)) { - if (retries == null) - retries = new HashSet<>(); - - retries.add(part); - - if (mappedKeys == null) { - mappedKeys = U.newLinkedHashMap(keys.size()); - - for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) { - if (key1.getKey() == key.getKey()) - break; - - mappedKeys.put(key.getKey(), key1.getValue()); - } - } - } - else if (mappedKeys != null) - mappedKeys.put(key.getKey(), key.getValue()); - } - } - - // Add new future. - add(getAsync(mappedKeys == null ? keys : mappedKeys)); - - // Finish this one. - return Collections.emptyList(); - } - }, - fut)); - } - - /** * @param key Key. * @param parts Parts to map. * @return {@code True} if mapped. http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index d0b8092..b2da39c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -29,6 +29,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -377,11 +378,38 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null : ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion()); - if (keyFut == null || keyFut.isDone()) + if (keyFut == null || keyFut.isDone()) { + if (keyFut != null) { + try { + keyFut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, e); + + return; + } + } + processDhtLockRequest0(nodeId, req); + } else { keyFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { + @Override public void apply(IgniteInternalFuture<Object> fut) { + try { + fut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, e); + + return; + } + processDhtLockRequest0(nodeId, req); } }); @@ -391,6 +419,31 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach /** * @param nodeId Node ID. * @param req Request. + * @param e Error. + */ + private void onForceKeysError(UUID nodeId, GridDhtLockRequest req, IgniteCheckedException e) { + GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + e, + ctx.deploymentEnabled()); + + try { + ctx.io().send(nodeId, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e0) { + if (log.isDebugEnabled()) + log.debug("Failed to send lock reply to remote node because it left grid: " + nodeId); + } + catch (IgniteCheckedException e0) { + U.error(log, "Failed to send lock reply to node: " + nodeId, e); + } + } + + /** + * @param nodeId Node ID. + * @param req Request. */ protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) { assert nodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6aad533..7361642 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -39,6 +39,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; @@ -1314,11 +1315,36 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ) { IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion()); - if (forceFut.isDone()) + if (forceFut.isDone()) { + try { + forceFut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, completionCb, e); + + return; + } + updateAllAsyncInternal0(nodeId, req, completionCb); + } else { forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { + @Override public void apply(IgniteInternalFuture<Object> fut) { + try { + fut.get(); + } + catch (NodeStoppingException e) { + return; + } + catch (IgniteCheckedException e) { + onForceKeysError(nodeId, req, completionCb, e); + + return; + } + updateAllAsyncInternal0(nodeId, req, completionCb); } }); @@ -1326,6 +1352,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * @param nodeId Node ID. + * @param req Update request. + * @param completionCb Completion callback. + * @param e Error. + */ + private void onForceKeysError(final UUID nodeId, + final GridNearAtomicUpdateRequest req, + final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, + IgniteCheckedException e + ) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + req.futureVersion(), + ctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + completionCb.apply(req, res); + } + + /** * Executes local update after preloader fetched values. * * @param nodeId Node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 7970a44..4da1f38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -246,7 +246,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec int curTopVer = topCntr.get(); - preloader.addFuture(this); + if (!preloader.addFuture(this)) { + assert isDone() : this; + + return false; + } trackable = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index f0054e4..1b51a4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -122,6 +123,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** */ private final AtomicInteger partsEvictOwning = new AtomicInteger(); + /** */ + private volatile boolean stopping; + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -244,6 +248,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (log.isDebugEnabled()) log.debug("DHT rebalancer onKernalStop callback."); + stopping = true; + cctx.events().removeListener(discoLsnr); // Acquire write busy lock. @@ -255,8 +261,19 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (demander != null) demander.stop(); + IgniteCheckedException err = stopError(); + + for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) + fut.onDone(err); + top = null; } + /** + * @return Node stop exception. + */ + private IgniteCheckedException stopError() { + return new NodeStoppingException("Operation has been cancelled (cache or node is stopping)."); + } /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { @@ -756,9 +773,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * Adds future to future map. * * @param fut Future to add. + * @return {@code False} if node cache is stopping and future was completed with error. */ - void addFuture(GridDhtForceKeysFuture<?, ?> fut) { + boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { forceKeyFuts.put(fut.futureId(), fut); + + if (stopping) { + fut.onDone(stopError()); + + return false; + } + + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 281b38e..0a18003 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1381,26 +1381,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void dumpStats() { - StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); + IgniteLogger log = this.log; - for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { - GridNioRecoveryDescriptor desc = entry.getValue(); + if (log != null && log.isInfoEnabled()) { + StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); - sb.append(" [key=").append(entry.getKey()) - .append(", msgsSent=").append(desc.sent()) - .append(", msgsAckedByRmt=").append(desc.acked()) - .append(", msgsRcvd=").append(desc.received()) - .append(", descIdHash=").append(System.identityHashCode(desc)) - .append(']').append(U.nl()); - } + for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); + + sb.append(" [key=").append(entry.getKey()) + .append(", msgsSent=").append(desc.sent()) + .append(", msgsAckedByRmt=").append(desc.acked()) + .append(", msgsRcvd=").append(desc.received()) + .append(", descIdHash=").append(System.identityHashCode(desc)) + .append(']').append(U.nl()); + } - if (log.isInfoEnabled()) log.info(sb.toString()); + } - GridNioServer<Message> nioSrvr1 = nioSrvr; + GridNioServer<Message> nioSrvr = this.nioSrvr; - if (nioSrvr1 != null) - nioSrvr1.dumpStats(); + if (nioSrvr != null) + nioSrvr.dumpStats(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java index 07f3833..7743882 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheBinaryObjectsScanSelfTest.java @@ -60,6 +60,8 @@ public class IgniteCacheBinaryObjectsScanSelfTest extends GridCommonAbstractTest /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { ldr = null; + + stopAllGrids(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/04280189/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java index 8b3d9d3..90d5905 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java @@ -25,12 +25,15 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + /** * */ @@ -45,6 +48,8 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + cfg.setPeerClassLoadingEnabled(false); TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); @@ -62,6 +67,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { ccfg.setName("cache*"); ccfg.setCacheMode(CacheMode.PARTITIONED); ccfg.setBackups(1); + ccfg.setWriteSynchronizationMode(FULL_SYNC); cfg.setCacheConfiguration(ccfg); @@ -70,7 +76,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected long getTestTimeout() { - return 3 * 60 * 1000L; + return 5 * 60 * 1000L; } /** {@inheritDoc} */ @@ -96,8 +102,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest { final AtomicInteger idx = new AtomicInteger(); GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override - public Void call() throws Exception { + @Override public Void call() throws Exception { int node = idx.getAndIncrement(); Ignite ignite = startGrid(node);
