IGNITE-5195 DataStreamer can fails if non-data node enter\leave the grid. This closes #3026.
Signed-off-by: nikolay_tikhonov <ntikho...@gridgain.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fb04be3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fb04be3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fb04be3 Branch: refs/heads/ignite-zk Commit: 5fb04be3942d8a90288fdc75f86693872c918e2d Parents: 43aa4a8 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Mon Nov 20 18:37:44 2017 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Mon Nov 20 18:39:03 2017 +0300 ---------------------------------------------------------------------- .../cache/GridCacheAffinityManager.java | 7 ++- .../datastreamer/DataStreamerImpl.java | 20 ++++++- .../cache/IgniteCacheDynamicStopSelfTest.java | 2 +- .../datastreamer/DataStreamerImplSelfTest.java | 60 ++++++++++++++++++++ 4 files changed, 85 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 14a1344..c9ee38c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -123,7 +123,12 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { if (cctx.isLocal()) topVer = LOC_CACHE_TOP_VER; - return aff.assignments(topVer); + GridAffinityAssignmentCache aff0 = aff; + + if (aff0 == null) + throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); + + return aff0.assignments(topVer); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index d38132f..12eb2dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -779,6 +779,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed else topVer = ctx.cache().context().exchange().readyAffinityVersion(); + List<List<ClusterNode>> assignments = cctx.affinity().assignments(topVer); + if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx required. gate = cctx.gate(); @@ -956,7 +958,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed final List<GridFutureAdapter<?>> futs; try { - futs = buf.update(entriesForNode, topVer, opFut, remap); + futs = buf.update(entriesForNode, topVer, assignments, opFut, remap); opFut.markInitialized(); } @@ -1411,6 +1413,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed @Nullable List<GridFutureAdapter<?>> update( Iterable<DataStreamerEntry> newEntries, AffinityTopologyVersion topVer, + List<List<ClusterNode>> assignments, GridCompoundFuture opFut, boolean remap ) throws IgniteInterruptedCheckedException { @@ -1441,9 +1444,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed futs[b.partId] = curFut0; } - if (b.batchTopVer == null) + if (b.batchTopVer == null) { + b.batchTopVer = topVer; + + b.assignments = assignments; + } + + // topology changed, but affinity is the same, no re-map is required. + if (!topVer.equals(b.batchTopVer) && b.assignments.equals(assignments)) { b.batchTopVer = topVer; + b.assignments = assignments; + } + curBatchTopVer = b.batchTopVer; b.entries.add(entry); @@ -2186,6 +2199,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** */ private final IgniteInClosure<? super IgniteInternalFuture<Object>> signalC; + /** Batch assignments */ + public List<List<ClusterNode>> assignments; + /** * @param partId Partition ID. * @param c Signal closure. http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java index 5628c4d..44cd475 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java @@ -142,4 +142,4 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest { ignite(0).destroyCache(DEFAULT_CACHE_NAME); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5fb04be3/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index e90f6b0..940f8ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.cache.CacheException; @@ -38,6 +39,7 @@ import org.apache.ignite.cache.CacheServerNotFoundException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -431,6 +433,64 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testClientEventsNotCausingRemaps() throws Exception { + Ignite ignite = startGrids(2); + + ignite.getOrCreateCache(DEFAULT_CACHE_NAME); + + IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME); + + ((DataStreamerImpl)streamer).maxRemapCount(3); + + streamer.addData(1, 1); + + for (int topChanges = 0; topChanges < 30; topChanges++) { + IgniteEx node = startGrid(getConfiguration("flapping-client").setClientMode(true)); + + streamer.addData(1, 1); + + node.close(); + + streamer.addData(1, 1); + } + + streamer.flush(); + streamer.close(); + } + + /** + * @throws Exception If failed. + */ + public void testServerEventsCauseRemaps() throws Exception { + Ignite ignite = startGrids(2); + + ignite.getOrCreateCache(DEFAULT_CACHE_NAME); + + IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME); + + ((DataStreamerImpl)streamer).maxRemapCount(0); + + streamer.addData(1, 1); + + startGrid(2); + + try { + streamer.addData(1, 1); + + streamer.flush(); + } + catch (IllegalStateException ex) { + assert ex.getMessage().contains("Data streamer has been closed"); + + return; + } + + fail("Expected exception wasn't thrown"); + } + + /** * Gets cache configuration. * * @return Cache configuration.