Repository: ignite Updated Branches: refs/heads/master 447ce47cc -> cbbd9ad4a
IGNITE-9550 fix Get operation returns null for a lost partition with READ_SAFE policy - Fixes #4947. Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cbbd9ad4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cbbd9ad4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cbbd9ad4 Branch: refs/heads/master Commit: cbbd9ad4a329d69ca88517ce2c480bffcf413e78 Parents: 447ce47 Author: Dmitriy Govorukhin <[email protected]> Authored: Thu Oct 11 12:42:27 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Thu Oct 11 12:42:27 2018 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 22 +- .../processors/cache/GridCacheAdapter.java | 9 +- .../GridCachePartitionExchangeManager.java | 15 +- .../processors/cache/GridCacheProcessor.java | 238 +++++++++++++++---- .../processors/cache/GridCacheProxyImpl.java | 7 +- .../processors/cache/IgniteCacheProxyImpl.java | 95 +++++++- .../dht/GridPartitionedGetFuture.java | 4 +- .../dht/GridPartitionedSingleGetFuture.java | 4 +- .../GridNearAtomicSingleUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 55 ++++- .../distributed/near/GridNearGetFuture.java | 27 +-- .../cache/query/GridCacheQueryAdapter.java | 2 +- .../continuous/CacheContinuousQueryHandler.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 2 +- .../cluster/DiscoveryDataClusterState.java | 17 ++ .../cluster/GridClusterStateProcessor.java | 2 +- .../datastreamer/DataStreamerImpl.java | 17 -- .../GridAffinityProcessorMemoryLeakTest.java | 2 +- ...CacheResultIsNotNullOnPartitionLossTest.java | 213 +++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 3 + .../kafka/KafkaIgniteStreamerSelfTest.java | 22 +- 22 files changed, 630 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index d009c5d..b57530b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -408,7 +408,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ClientCacheChangeDummyDiscoveryMessage msg, boolean crd, AffinityTopologyVersion topVer, - DiscoCache discoCache) { + DiscoCache discoCache + ) { Map<String, DynamicCacheChangeRequest> startReqs = msg.startRequests(); if (startReqs == null) @@ -434,11 +435,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName()); - cctx.cache().prepareCacheStart(desc.cacheConfiguration(), + cctx.cache().prepareCacheStart( + desc.cacheConfiguration(), desc, startReq.nearCacheConfiguration(), topVer, - startReq.disabledAfterStart()); + startReq.disabledAfterStart() + ); startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null); @@ -564,6 +567,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap cctx.cache().initCacheProxies(topVer, null); + startReqs.keySet().forEach(req -> cctx.cache().completeProxyInitialize(req)); + cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null); return startedInfos; @@ -578,7 +583,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap private Set<Integer> processCacheCloseRequests( ClientCacheChangeDummyDiscoveryMessage msg, boolean crd, - AffinityTopologyVersion topVer) { + AffinityTopologyVersion topVer + ) { Set<String> cachesToClose = msg.cachesToClose(); if (cachesToClose == null) @@ -869,7 +875,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert cctx.cacheContext(cacheDesc.cacheId()) == null : "Starting cache has not null context: " + cacheDesc.cacheName(); - IgniteCacheProxyImpl cacheProxy = (IgniteCacheProxyImpl) cctx.cache().jcacheProxy(req.cacheName()); + IgniteCacheProxyImpl cacheProxy = cctx.cache().jcacheProxy(req.cacheName(), false); // If it has proxy then try to start it if (cacheProxy != null) { @@ -887,11 +893,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap try { if (startCache) { - cctx.cache().prepareCacheStart(req.startCacheConfiguration(), + cctx.cache().prepareCacheStart( + req.startCacheConfiguration(), cacheDesc, nearCfg, evts.topologyVersion(), - req.disabledAfterStart()); + req.disabledAfterStart() + ); if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) { if (fut.events().discoveryCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cf9337b..3156d6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4264,7 +4264,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert topVer != null && topVer.topologyVersion() > 0 : tx; - ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get(); + AffinityTopologyVersion awaitVer = new AffinityTopologyVersion( + topVer.topologyVersion() + 1, 0); + + ctx.shared().exchange().affinityReadyFuture(awaitVer).get(); continue; } @@ -5031,8 +5034,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert topVer != null && topVer.topologyVersion() > 0 : tx; + AffinityTopologyVersion awaitVer = new AffinityTopologyVersion(topVer.topologyVersion() + 1, 0); + IgniteInternalFuture<?> topFut = - ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1); + ctx.shared().exchange().affinityReadyFuture(awaitVer); topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> topFut) { http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a18022e..dbfc3e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -898,7 +899,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana while (true) { GridDhtTopologyFuture cur = lastFinishedFut.get(); - if (cur == null || fut.topologyVersion().compareTo(cur.topologyVersion()) > 0) { + if (fut.topologyVersion() != null && (cur == null || fut.topologyVersion().compareTo(cur.topologyVersion()) > 0)) { if (lastFinishedFut.compareAndSet(cur, fut)) break; } @@ -912,23 +913,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return Future or {@code null} is future is already completed. */ @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion ver) { - GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; - - if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) { - if (log.isTraceEnabled()) - log.trace("Return lastInitializedFut for topology ready future " + - "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']'); - - return lastInitializedFut0; - } - AffinityTopologyVersion topVer = exchFuts.readyTopVer(); if (topVer.compareTo(ver) >= 0) { if (log.isTraceEnabled()) log.trace("Return finished future for topology ready future [ver=" + ver + ", topVer=" + topVer + ']'); - return null; + return new GridFinishedFuture<>(topVer); } GridFutureAdapter<AffinityTopologyVersion> fut = F.addIfAbsent(readyFuts, ver, http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index f595ecf..a6de1e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -35,6 +35,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.cache.configuration.FactoryBuilder; import javax.cache.expiry.EternalExpiryPolicy; @@ -740,7 +741,6 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** - * * @throws IgniteCheckedException If failed. */ private void startCachesOnStart() throws IgniteCheckedException { @@ -1216,6 +1216,9 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor sharedCtx.removeCacheContext(cctx); caches.remove(cctx.name()); + + completeProxyInitialize(cctx.name()); + jCacheProxies.remove(cctx.name()); stoppedCaches.add(cctx.cache()); @@ -1415,9 +1418,10 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (destroy && (pageStore = sharedCtx.pageStore()) != null) { try { pageStore.removeCacheData(new StoredCacheData(ctx.config())); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { U.error(log, "Failed to delete cache configuration data while destroying cache" + - "[cache=" + ctx.name() + "]", e); + "[cache=" + ctx.name() + "]", e); } } @@ -1818,6 +1822,67 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** + * + * @param reqs Cache requests to start. + * @param fut Completable future. + */ + public void registrateProxyRestart(Map<String, DynamicCacheChangeRequest> reqs, GridFutureAdapter<?> fut) { + for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) { + if (reqs.containsKey(proxy.getName()) && + proxy.isRestarting() && + !reqs.get(proxy.getName()).disabledAfterStart() + ) + proxy.registrateFutureRestart(fut); + } + } + + /** + * + * @param reqs Cache requests to start. + * @param initVer Init exchange version. + * @param doneVer Finish excahnge vertison. + */ + public void completeProxyRestart( + Map<String, DynamicCacheChangeRequest> reqs, + AffinityTopologyVersion initVer, + AffinityTopologyVersion doneVer + ) { + if (initVer == null || doneVer == null) + return; + + for (GridCacheAdapter<?, ?> cache : caches.values()) { + GridCacheContext<?, ?> cacheCtx = cache.context(); + + if (reqs.containsKey(cache.name()) || + (cacheCtx.startTopologyVersion().compareTo(initVer) <= 0 || + cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0)) + completeProxyInitialize(cache.name()); + + if ( + cacheCtx.startTopologyVersion().compareTo(initVer) >= 0 && + cacheCtx.startTopologyVersion().compareTo(doneVer) <= 0 + ) { + IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cache.name()); + + boolean canRestart = true; + + DynamicCacheChangeRequest req = reqs.get(cache.name()); + + if (req != null) { + canRestart = !req.disabledAfterStart(); + } + + if (proxy != null && proxy.isRestarting() && canRestart) { + proxy.onRestarted(cacheCtx, cache); + + if (cacheCtx.dataStructuresCache()) + ctx.dataStructures().restart(proxy.internalProxy()); + } + } + } + } + + /** * Gets a collection of currently started caches. * * @return Collection of started cache names. @@ -1832,8 +1897,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** - * Gets public cache that can be used for query execution. - * If cache isn't created on current node it will be started. + * Gets public cache that can be used for query execution. If cache isn't created on current node it will be + * started. * * @param start Start cache. * @param inclLoc Include local caches. @@ -2009,8 +2074,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor * @param desc Cache descriptor. * @param reqNearCfg Near configuration if specified for client cache start request. * @param exchTopVer Current exchange version. - * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will change - * state of proxies to restarting + * @param disabledAfterStart If true, then we will discard restarting state from proxies. If false then we will + * change state of proxies to restarting * @throws IgniteCheckedException If failed. */ void prepareCacheStart( @@ -2097,20 +2162,11 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor grp.onCacheStarted(cacheCtx); onKernalStart(cache); - - IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(ccfg.getName()); - - if (!disabledAfterStart && proxy != null && proxy.isRestarting()) { - proxy.onRestarted(cacheCtx, cache); - - if (cacheCtx.dataStructuresCache()) - ctx.dataStructures().restart(proxy.internalProxy()); - } } /** - * Restarts proxies of caches if they was marked as restarting. - * Requires external synchronization - shouldn't be called concurrently with another caches restart. + * Restarts proxies of caches if they was marked as restarting. Requires external synchronization - shouldn't be + * called concurrently with another caches restart. */ public void restartProxies() { for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values()) { @@ -2184,7 +2240,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor CacheGroupContext old = cacheGrps.put(desc.groupId(), grp); - if (!grp.systemCache() && !U.IGNITE_MBEANS_DISABLED) { + if (!grp.systemCache() && !U.IGNITE_MBEANS_DISABLED) { try { U.registerMBean(ctx.config().getMBeanServer(), ctx.igniteInstanceName(), CACHE_GRP_METRICS_MBEAN_GRP, grp.cacheOrGroupName(), grp.mxBean(), CacheGroupMetricsMXBean.class); @@ -2206,7 +2262,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor */ void blockGateway(String cacheName, boolean stop, boolean restart) { // Break the proxy before exchange future is done. - IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(cacheName); + IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, false); if (restart) { GridCacheAdapter<?, ?> cache = caches.get(cacheName); @@ -2250,8 +2306,11 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (proxy != null) proxy.restart(); } - else + else { + completeProxyInitialize(req.cacheName()); + proxy = jCacheProxies.remove(req.cacheName()); + } if (proxy != null) proxy.context().gate().onStopped(); @@ -2292,12 +2351,12 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (cacheCtx.startTopologyVersion().equals(startTopVer)) { if (!jCacheProxies.containsKey(cacheCtx.name())) { - IgniteCacheProxyImpl newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false); + IgniteCacheProxyImpl<?, ?> newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false); if (!cache.active()) newProxy.restart(); - jCacheProxies.putIfAbsent(cacheCtx.name(), newProxy); + addjCacheProxy(cacheCtx.name(), newProxy); } if (cacheCtx.preloader() != null) @@ -2315,6 +2374,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor Set<Integer> ids = null; for (String cacheName : cachesToClose) { + completeProxyInitialize(cacheName); + blockGateway(cacheName, false, false); GridCacheContext ctx = sharedCtx.cacheContext(CU.cacheId(cacheName)); @@ -2345,6 +2406,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor assert cache != null : cctx.name(); jCacheProxies.put(cctx.name(), new IgniteCacheProxyImpl(cache.context(), cache, false)); + + completeProxyInitialize(cctx.name()); } else { cctx.gate().onStopped(); @@ -2356,6 +2419,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (!cctx.affinityNode() && cctx.transactional()) sharedCtx.tm().rollbackTransactionsForCache(cctx.cacheId()); + completeProxyInitialize(cctx.name()); + jCacheProxies.remove(cctx.name()); sharedCtx.database().checkpointReadLock(); @@ -2377,8 +2442,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor } /** - * Called during the rollback of the exchange partitions procedure - * in order to stop the given cache even if it's not fully initialized (e.g. failed on cache init stage). + * Called during the rollback of the exchange partitions procedure in order to stop the given cache even if it's not + * fully initialized (e.g. failed on cache init stage). * * @param exchActions Stop requests. */ @@ -2436,8 +2501,8 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor CacheGroupContext grp = cacheGrps.get(groupId); if (grp != null && grp.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) { - GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager) sharedCtx.database(); - mngr.removeCheckpointListener((DbCheckpointListener) grp.offheap()); + GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager)sharedCtx.database(); + mngr.removeCheckpointListener((DbCheckpointListener)grp.offheap()); } } } @@ -3484,7 +3549,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor IgniteInternalFuture<?> dynamicCloseCache(String cacheName) { assert cacheName != null; - IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName); + IgniteCacheProxy<?, ?> proxy = jcacheProxy(cacheName, false); if (proxy == null || proxy.isProxyClosed()) return new GridFinishedFuture<>(); // No-op. @@ -3657,6 +3722,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor /** * Authorize creating cache. + * * @param cfg Cache configuration. * @param secCtx Optional security context. */ @@ -3670,6 +3736,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor /** * Authorize dynamic cache management for this node. + * * @param req start/stop cache request. */ private void authorizeCacheChange(DynamicCacheChangeRequest req) { @@ -3752,7 +3819,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer); if (msg instanceof DynamicCacheChangeFailureMessage) - cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage) msg, topVer); + cachesInfo.onCacheChangeRequested((DynamicCacheChangeFailureMessage)msg, topVer); if (msg instanceof ClientCacheChangeDiscoveryMessage) cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node); @@ -3995,12 +4062,61 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (log.isDebugEnabled()) log.debug("Getting cache for name: " + name); - IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true); return jcache == null ? null : jcache.internalProxy(); } /** + * Await proxy initialization. + * + * @param jcache Cache proxy. + */ + private void awaitInitializeProxy(IgniteCacheProxyImpl<?, ?> jcache) { + if (jcache != null) { + CountDownLatch initLatch = jcache.getInitLatch(); + + try { + while (initLatch.getCount() > 0) { + initLatch.await(2000, TimeUnit.MILLISECONDS); + + if (log.isInfoEnabled()) + log.info("Failed to wait proxy initialization, cache=" + jcache.getName() + + ", localNodeId=" + ctx.localNodeId()); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Ignore intteruption. + } + } + } + + /** + * @param name Cache name. + */ + public void completeProxyInitialize(String name) { + IgniteCacheProxyImpl<?, ?> jcache = jCacheProxies.get(name); + + if (jcache != null) { + CountDownLatch proxyInitLatch = jcache.getInitLatch(); + + if (proxyInitLatch.getCount() > 0) { + if (log.isInfoEnabled()) + log.info("Finish proxy initialization, cacheName=" + name + + ", localNodeId=" + ctx.localNodeId()); + + proxyInitLatch.countDown(); + } + } + else { + if (log.isInfoEnabled()) + log.info("Can not finish proxy initialization because proxy does not exist, cacheName=" + name + + ", localNodeId=" + ctx.localNodeId()); + } + } + + /** * @param name Cache name. * @return Cache instance for given name. * @throws IgniteCheckedException If failed. @@ -4016,18 +4132,21 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public <K, V> IgniteInternalCache<K, V> getOrStartCache(String name, CacheConfiguration ccfg) throws IgniteCheckedException { + public <K, V> IgniteInternalCache<K, V> getOrStartCache( + String name, + CacheConfiguration ccfg + ) throws IgniteCheckedException { assert name != null; if (log.isDebugEnabled()) log.debug("Getting cache for name: " + name); - IgniteCacheProxy<?, ?> cache = jCacheProxies.get(name); + IgniteCacheProxy<?, ?> cache = jcacheProxy(name, true); if (cache == null) { dynamicStartCache(ccfg, name, null, false, ccfg == null, true).get(); - cache = jCacheProxies.get(name); + cache = jcacheProxy(name, true); } return cache == null ? null : (IgniteInternalCache<K, V>)cache.internalProxy(); @@ -4071,13 +4190,21 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor */ private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) { if (ctx.discovery().localNode().isClient()) { - IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jcacheProxy(name, true); if (proxy == null) { GridCacheAdapter<?, ?> cacheAdapter = caches.get(name); - if (cacheAdapter != null) + if (cacheAdapter != null) { proxy = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false); + + IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, (IgniteCacheProxyImpl<?, ?>)proxy); + + if (prev != null) + proxy = (IgniteCacheProxy<K, V>)prev; + + completeProxyInitialize(proxy.getName()); + } } assert proxy != null : name; @@ -4109,7 +4236,7 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (!desc.cacheType().userCache()) throw new IllegalStateException("Failed to get cache because it is a system cache: " + name); - IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true); if (jcache == null) throw new IllegalArgumentException("Cache is not started: " + name); @@ -4150,16 +4277,16 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor if (desc != null && !desc.cacheType().userCache()) throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName); - IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(cacheName); + IgniteCacheProxyImpl<?, ?> proxy = jcacheProxy(cacheName, true); // Try to start cache, there is no guarantee that cache will be instantiated. - if (cache == null) { + if (proxy == null) { dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get(); - cache = jCacheProxies.get(cacheName); + proxy = jcacheProxy(cacheName, true); } - return cache != null ? (IgniteCacheProxy<K, V>)cache.gatewayWrapper() : null; + return proxy != null ? (IgniteCacheProxy<K, V>)proxy.gatewayWrapper() : null; } /** @@ -4276,13 +4403,21 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor public <K, V> IgniteCacheProxy<K, V> jcache(String name) { assert name != null; - IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>) jCacheProxies.get(name); + IgniteCacheProxy<K, V> cache = (IgniteCacheProxy<K, V>)jcacheProxy(name, true); if (cache == null) { GridCacheAdapter<?, ?> cacheAdapter = caches.get(name); - if (cacheAdapter != null) + if (cacheAdapter != null) { cache = new IgniteCacheProxyImpl(cacheAdapter.context(), cacheAdapter, false); + + IgniteCacheProxyImpl<?, ?> prev = addjCacheProxy(name, (IgniteCacheProxyImpl<?, ?>)cache); + + if (prev != null) + cache = (IgniteCacheProxy<K, V>)prev; + + completeProxyInitialize(cache.getName()); + } } if (cache == null) @@ -4293,10 +4428,25 @@ public class GridCacheProcessor extends GridProcessorAdapter implements Metastor /** * @param name Cache name. + * @param awaitInit Await proxy initialization. * @return Cache proxy. */ - @Nullable public IgniteCacheProxy jcacheProxy(String name) { - return jCacheProxies.get(name); + @Nullable public IgniteCacheProxyImpl<?, ?> jcacheProxy(String name, boolean awaitInit) { + IgniteCacheProxyImpl<?, ?> cache = jCacheProxies.get(name); + + if (awaitInit) + awaitInitializeProxy(cache); + + return cache; + } + + /** + * @param name Cache name. + * @param proxy Cache proxy. + * @return Previous cache proxy. + */ + @Nullable public IgniteCacheProxyImpl<?, ?> addjCacheProxy(String name, IgniteCacheProxyImpl<?, ?> proxy) { + return jCacheProxies.putIfAbsent(name, proxy); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index a9ce448..ebb8b5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -90,8 +90,11 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte * @param delegate Delegate object. * @param opCtx Optional operation context which will be passed to gateway. */ - public GridCacheProxyImpl(GridCacheContext<K, V> ctx, IgniteInternalCache<K, V> delegate, - @Nullable CacheOperationContext opCtx) { + public GridCacheProxyImpl( + GridCacheContext<K, V> ctx, + IgniteInternalCache<K, V> delegate, + @Nullable CacheOperationContext opCtx + ) { assert ctx != null; assert delegate != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 4989efb..776e1cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import javax.cache.Cache; @@ -136,16 +137,23 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< private CacheManager cacheMgr; /** Future indicates that cache is under restarting. */ - private final AtomicReference<GridFutureAdapter<Void>> restartFut; + private final AtomicReference<RestartFuture> restartFut; /** Flag indicates that proxy is closed. */ private volatile boolean closed; + /** Proxy initialization latch used for await final completion after proxy created, as an example, + * a proxy may be created but the exchange is not completed and if we try to perform some cache + * the operation we get last finished exchange future (need for validation) + * for the previous version but not for current. + */ + private final CountDownLatch initLatch = new CountDownLatch(1); + /** * Empty constructor required for {@link Externalizable}. */ public IgniteCacheProxyImpl() { - restartFut = new AtomicReference<GridFutureAdapter<Void>>(null); + restartFut = new AtomicReference<>(null); } /** @@ -158,7 +166,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< @NotNull IgniteInternalCache<K, V> delegate, boolean async ) { - this(ctx, delegate, new AtomicReference<GridFutureAdapter<Void>>(null), async); + this(ctx, delegate, new AtomicReference<>(null), async); } /** @@ -169,7 +177,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< private IgniteCacheProxyImpl( @NotNull GridCacheContext<K, V> ctx, @NotNull IgniteInternalCache<K, V> delegate, - @NotNull AtomicReference<GridFutureAdapter<Void>> restartFut, + @NotNull AtomicReference<RestartFuture> restartFut, boolean async ) { super(async); @@ -184,6 +192,14 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< } /** + * + * @return Init latch. + */ + public CountDownLatch getInitLatch(){ + return initLatch; + } + + /** * @return Context. */ @Override public GridCacheContext<K, V> context() { @@ -1766,6 +1782,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @return Internal proxy. */ @Override public GridCacheProxyImpl<K, V> internalProxy() { + checkRestart(); + return new GridCacheProxyImpl<>(ctx, delegate, ctx.operationContextPerCall()); } @@ -1842,11 +1860,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * Throws {@code IgniteCacheRestartingException} if proxy is restarting. */ public void checkRestart() { - GridFutureAdapter<Void> currentFut = this.restartFut.get(); + RestartFuture currentFut = restartFut.get(); if (currentFut != null) - throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), "Cache is restarting: " + - context().name()); + currentFut.checkRestartOrAwait(); } /** @@ -1860,9 +1877,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * Restarts this cache proxy. */ public boolean restart() { - GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>(); + RestartFuture restartFut = new RestartFuture(ctx.name()); - final GridFutureAdapter<Void> curFut = this.restartFut.get(); + RestartFuture curFut = this.restartFut.get(); boolean changed = this.restartFut.compareAndSet(curFut, restartFut); @@ -1880,12 +1897,22 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< } /** + * @param fut Finish restart future. + */ + public void registrateFutureRestart(GridFutureAdapter<?> fut){ + RestartFuture currentFut = restartFut.get(); + + if (currentFut != null) + currentFut.addRestartFinishedFuture(fut); + } + + /** * If proxy is already being restarted, returns future to wait on, else restarts this cache proxy. * * @return Future to wait on, or null. */ public GridFutureAdapter<Void> opportunisticRestart() { - GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>(); + RestartFuture restartFut = new RestartFuture(ctx.name()); while (true) { if (this.restartFut.compareAndSet(null, restartFut)) @@ -1905,7 +1932,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< * @param delegate New delegate. */ public void onRestarted(GridCacheContext ctx, IgniteInternalCache delegate) { - GridFutureAdapter<Void> restartFut = this.restartFut.get(); + RestartFuture restartFut = this.restartFut.get(); assert restartFut != null; @@ -1917,6 +1944,52 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< restartFut.onDone(); } + /** + * + */ + private class RestartFuture extends GridFutureAdapter<Void> { + /** */ + private final String name; + + /** */ + private volatile GridFutureAdapter<?> restartFinishFut; + + /** */ + private RestartFuture(String name) { + this.name = name; + } + + /** + * + */ + void checkRestartOrAwait() { + GridFutureAdapter<?> fut = restartFinishFut; + + if (fut != null) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + return; + } + + throw new IgniteCacheRestartingException( + new IgniteFutureImpl<>(this), + "Cache is restarting: " + name + ); + } + + /** + * + */ + void addRestartFinishedFuture(GridFutureAdapter<?> fut) { + restartFinishFut = fut; + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteCacheProxyImpl.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 1512963..e09a599 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -803,7 +803,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - cctx.affinity().affinityReadyFuture(updTopVer).listen( + cctx.shared().exchange().affinityReadyFuture(updTopVer).listen( new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { try { @@ -869,7 +869,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda } // Need to wait for next topology version to remap. - IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.affinity().affinityReadyFuture(rmtTopVer); + IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer); topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 8ed6695..4dc72b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -615,7 +615,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec } if (canRemap) { - IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.affinity().affinityReadyFuture(rmtTopVer); + IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer); topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) { @@ -740,7 +740,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec AffinityTopologyVersion updTopVer = new AffinityTopologyVersion( Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - cctx.affinity().affinityReadyFuture(updTopVer).listen( + cctx.shared().exchange().affinityReadyFuture(updTopVer).listen( new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 82a7964..4c0d2db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -357,7 +357,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( "Failed to update keys, topology changed while execute atomic update inside transaction."); - cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + cause.retryReadyFuture(cctx.shared().exchange().affinityReadyFuture(remapTopVer)); e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index fd6b63e..28ebfb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -485,7 +485,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( "Failed to update keys, topology changed while execute atomic update inside transaction."); - cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + cause.retryReadyFuture(cctx.shared().exchange().affinityReadyFuture(remapTopVer)); e.add(remapKeys, cause); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 2052c36..514b81f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.ExchangeContext; @@ -333,6 +334,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** Latest (by update sequences) full message with exchangeId == null, need to be processed right after future is done. */ private GridDhtPartitionsFullMessage delayedLatestMsg; + /** Future for wait all exchange listeners comepleted. */ + private final GridFutureAdapter<?> afterLsnrCompleteFut = new GridFutureAdapter<>(); + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -774,6 +778,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + cctx.cache().registrateProxyRestart(resolveCacheRequests(exchActions), afterLsnrCompleteFut); + updateTopologies(crdNode, cctx.coordinators().currentCoordinator()); switch (exchange) { @@ -1830,6 +1836,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { + assert res != null || err != null : "TopVer=" + res + ", err=" + err; + if (isDone() || !done.compareAndSet(false, true)) return false; @@ -1907,18 +1915,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.coordinators().onExchangeDone(exchCtx.newMvccCoordinator(), exchCtx.events().discoveryCache(), exchCtx.activeQueries()); + // Create and destory caches and cache proxies. cctx.cache().onExchangeDone(initialVersion(), exchActions, err); - cctx.exchange().onExchangeDone(res, initialVersion(), err); - cctx.kernalContext().authentication().onActivate(); - if (exchActions != null && err == null) - exchActions.completeRequestFutures(cctx, null); - - if (stateChangeExchange() && err == null) - cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest()); - Map<T2<Integer, Integer>, Long> localReserved = partHistSuppliers.getReservations(cctx.localNodeId()); if (localReserved != null) { @@ -1946,7 +1947,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.walState().changeLocalStatesOnExchangeDone(res); } + final Throwable err0 = err; + + // Should execute this listener first, before any external listeners. + // Listeners use stack as data structure. + listen(f -> { + // Update last finished future in the first. + cctx.exchange().lastFinishedFuture(this); + + // Complete any affReady futures and update last exchange done version. + cctx.exchange().onExchangeDone(res, initialVersion(), err0); + + cctx.cache().completeProxyRestart(resolveCacheRequests(exchActions), initialVersion(), res); + + if (exchActions != null && err0 == null) + exchActions.completeRequestFutures(cctx, null); + + if (stateChangeExchange() && err0 == null) + cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest()); + }); + if (super.onDone(res, err)) { + afterLsnrCompleteFut.onDone(); + if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); @@ -1973,8 +1996,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null); if (err == null) { - cctx.exchange().lastFinishedFuture(this); - if (exchCtx != null && (exchCtx.events().hasServerLeft() || exchCtx.events().hasServerJoin())) { ExchangeDiscoveryEvents evts = exchCtx.events(); @@ -1993,6 +2014,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @param exchangeActions Exchange actions. + * @return Map of cache names and start descriptors. + */ + private Map<String, DynamicCacheChangeRequest> resolveCacheRequests(ExchangeActions exchangeActions) { + if (exchangeActions == null) + return Collections.emptyMap(); + + return exchangeActions.cacheStartRequests() + .stream() + .map(ExchangeActions.CacheActionData::request) + .collect(Collectors.toMap(DynamicCacheChangeRequest::cacheName, r -> r)); + } + + /** * Method waits for new caches registration and cache configuration persisting to disk. */ private void waitUntilNewCachesAreRegistered() { http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 87801a9..d7800ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -942,21 +942,18 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - try { - // Remap. - map(keys.keySet(), F.t(node, keys), fut.get()); - - onDone(Collections.<K, V>emptyMap()); - } - catch (IgniteCheckedException e) { - GridNearGetFuture.this.onDone(e); - } - } + cctx.shared().exchange().affinityReadyFuture(updTopVer).listen(f -> { + try { + // Remap. + map(keys.keySet(), F.t(node, keys), f.get()); + + onDone(Collections.<K, V>emptyMap()); + + } + catch (IgniteCheckedException e) { + GridNearGetFuture.this.onDone(e); } - ); + }); } } @@ -1005,7 +1002,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } // Need to wait for next topology version to remap. - IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.affinity().affinityReadyFuture(rmtTopVer); + IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.shared().exchange().affinityReadyFuture(rmtTopVer); topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void applyx( http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 0e3ab43..ce9a175 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -864,7 +864,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { assert waitVer != null; - retryFut = cctx.affinity().affinityReadyFuture(waitVer); + retryFut = cctx.shared().exchange().affinityReadyFuture(waitVer); } else if (e.hasCause(ClusterTopologyCheckedException.class)) { ClusterTopologyCheckedException topEx = X.cause(e, ClusterTopologyCheckedException.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 307a4ea..d1640c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -641,7 +641,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!cctx.isLocal()) { AffinityTopologyVersion topVer = initTopVer; - cacheContext(ctx).affinity().affinityReadyFuture(topVer).get(); + cacheContext(ctx).shared().exchange().affinityReadyFuture(topVer).get(); for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) getOrCreatePartitionRecovery(ctx, partId, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index aa276cc..03a9749 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -403,7 +403,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { topVer, (byte)0); - IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name()); + IgniteCacheProxy jcache = cctx.kernalContext().cache().jcacheProxy(cctx.name(), true); assert jcache != null : "Failed to get cache proxy [name=" + cctx.name() + ", locStart=" + cctx.startTopologyVersion() + http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java index a28bfc9..b3879ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java @@ -201,6 +201,23 @@ public class DiscoveryDataClusterState implements Serializable { } /** + * + * @return {@code True} If baseLine changed, {@code False} if not. + */ + public boolean baselineChanged() { + BaselineTopology prevBLT = previousBaselineTopology(); + BaselineTopology curBLT = baselineTopology(); + + if (prevBLT == null && curBLT != null) + return true; + + if (prevBLT!= null && curBLT != null) + return !prevBLT.equals(curBLT); + + return false; + } + + /** * @return {@code True} if baseline topology is set in the cluster. {@code False} otherwise. */ public boolean hasBaselineTopology() { http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index d52d388..6d5d474 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -208,7 +208,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I })); } else - return new IgniteFinishedFutureImpl<>(false); + return new IgniteFinishedFutureImpl<>(globalState.baselineChanged()); } transitionRes = globalState.transitionResult(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index e86f653..4253fb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -2206,24 +2206,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed GridCacheContext<?, ?> cctx = internalCache.context(); -/* todo: uncomment this and remove topFut choosing logic below after IGNITE-9550 race is fixed GridDhtTopologyFuture topFut = cctx.shared().exchange().lastFinishedFuture(); AffinityTopologyVersion topVer = topFut.topologyVersion(); -*/ - AffinityTopologyVersion topVer = cctx.isLocal() ? - cctx.affinity().affinityTopologyVersion() : - cctx.shared().exchange().readyAffinityVersion(); - - GridDhtTopologyFuture topFut = (GridDhtTopologyFuture)cctx.shared().exchange().affinityReadyFuture(topVer); - - if (topFut == null) { - // Exchange for newer topology version is already in progress, let's try to use last finished future. - GridDhtTopologyFuture lastFinishedFut = cctx.shared().exchange().lastFinishedFuture(); - - if (F.eq(lastFinishedFut.topologyVersion(), topVer)) - topFut = lastFinishedFut; - } GridCacheVersion ver = cctx.versions().isolatedStreamerVersion(); @@ -2285,8 +2270,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } if (topFut != null) { - topFut.get(); - Throwable err = topFut.validateCache(cctx, false, false, entry.key(), null); if (err != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java index 3b6857d..d6c5727 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorMemoryLeakTest.java @@ -41,7 +41,7 @@ import static org.apache.ignite.IgniteSystemProperties.getInteger; @GridCommonTest(group = "Affinity Processor") public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest { /** Max value for affinity history size name. Should be the same as in GridAffinityAssignmentCache.MAX_HIST_SIZE */ - private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100); + private final int MAX_HIST_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500); /** Cache name. */ private static final String CACHE_NAME = "cache"; http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java new file mode 100644 index 0000000..ceafc9e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheResultIsNotNullOnPartitionLossTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.PartitionLossPolicy; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheResultIsNotNullOnPartitionLossTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of servers to be started. */ + private static final int SERVERS = 10; + + /** Index of node that is goning to be the only client node. */ + private static final int CLIENT_IDX = SERVERS; + + /** Number of cache entries to insert into the test cache. */ + private static final int CACHE_ENTRIES_CNT = 10_000; + + /** True if {@link #getConfiguration(String)} is expected to configure client node on next invocations. */ + private boolean isClient; + + /** Client Ignite instance. */ + private IgniteEx client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST); + + cfg.setCacheConfiguration( + new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setCacheMode(CacheMode.PARTITIONED) + .setBackups(0) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE) + ); + + if (isClient) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + startGrids(SERVERS); + + isClient = true; + + client = startGrid(CLIENT_IDX); + + try (IgniteDataStreamer<Integer, Integer> dataStreamer = client.dataStreamer(DEFAULT_CACHE_NAME)) { + dataStreamer.allowOverwrite(true); + + for (int i = 0; i < CACHE_ENTRIES_CNT; i++) + dataStreamer.addData(i, i); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + public void testCacheResultIsNotNullOnClient() throws Exception { + testCacheResultIsNotNull0(client); + } + + /** + * @throws Exception If failed. + */ + public void testCacheResultIsNotNullOnLastServer() throws Exception { + testCacheResultIsNotNull0(grid(SERVERS - 1)); + } + + /** + * @throws Exception If failed. + */ + public void testCacheResultIsNotNullOnServer() throws Exception { + testCacheResultIsNotNull0(grid(SERVERS - 2)); + } + /** + * @throws Exception If failed. + */ + private void testCacheResultIsNotNull0(IgniteEx ignite) throws Exception { + AtomicBoolean stopReading = new AtomicBoolean(); + + AtomicReference<Throwable> unexpectedThrowable = new AtomicReference<>(); + + IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); + + CountDownLatch readerThreadStarted = new CountDownLatch(1); + + IgniteInternalFuture<Boolean> nullCacheValFoundFut = GridTestUtils.runAsync(() -> { + readerThreadStarted.countDown(); + + while (!stopReading.get()) + for (int i = 0; i < CACHE_ENTRIES_CNT && !stopReading.get(); i++) { + try { + if (cache.get(i) == null) + return true; + } + catch (Throwable t) { + if (expectedThrowableClass(t)) { + try { + cache.put(i, i); + + unexpectedThrowable.set(new RuntimeException("Cache put was successful for entry " + i)); + } + catch (Throwable t2) { + if (!expectedThrowableClass(t2)) + unexpectedThrowable.set(t2); + } + } + else + unexpectedThrowable.set(t); + + break; + } + } + + return false; + }); + + try { + readerThreadStarted.await(1, TimeUnit.SECONDS); + + for (int i = 0; i < SERVERS - 1; i++) { + Thread.sleep(50L); + + grid(i).close(); + } + } + finally { + // Ask reader thread to finish its execution. + stopReading.set(true); + } + + assertFalse("Null value was returned by cache.get instead of exception.", nullCacheValFoundFut.get()); + + Throwable throwable = unexpectedThrowable.get(); + if (throwable != null) { + throwable.printStackTrace(); + + fail(throwable.getMessage()); + } + } + + /** + * + */ + private boolean expectedThrowableClass(Throwable throwable) { + return X.hasCause( + throwable, + CacheInvalidStateException.class, + ClusterTopologyCheckedException.class, + IllegalStateException.class, + NodeStoppingException.class + ); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 370fa49..cb4be14 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -97,6 +97,7 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest; import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest; import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheResultIsNotNullOnPartitionLossTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; import org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest; import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; @@ -340,6 +341,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheContainsKeyAtomicTest.class); + suite.addTestSuite(CacheResultIsNotNullOnPartitionLossTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbbd9ad4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java index 48d4a8d..081add8 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java @@ -31,9 +31,13 @@ import java.util.concurrent.TimeoutException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -201,9 +205,24 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(CNT); IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { + @IgniteInstanceResource + private Ignite ig; + + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); + if (log.isInfoEnabled()) { + IgniteEx igEx = (IgniteEx)ig; + + UUID nodeId = igEx.localNode().id(); + + log.info("Recive event=" + evt + ", nodeId=" + nodeId); + } + return true; } }; @@ -211,7 +230,8 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); // Checks all events successfully processed in 10 seconds. - assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events", + latch.await(10, TimeUnit.SECONDS)); for (Map.Entry<String, String> entry : keyValMap.entrySet()) assertEquals(entry.getValue(), cache.get(entry.getKey()));
