Repository: ignite Updated Branches: refs/heads/master c8be8c2b8 -> e20cf5a73
IGNITE-8462 DataStreamerImpl.close(true) should done all futures Signed-off-by: Anton Vinogradov <a...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e20cf5a7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e20cf5a7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e20cf5a7 Branch: refs/heads/master Commit: e20cf5a731d7ed05ee050785c2e7d46bcf6a7f44 Parents: c8be8c2 Author: Fedotov <vane...@gmail.com> Authored: Fri Jun 22 13:11:20 2018 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Fri Jun 22 13:11:20 2018 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerImpl.java | 77 ++++++++++++++++---- .../datastreamer/DataStreamerImplSelfTest.java | 29 ++++++++ 2 files changed, 90 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e20cf5a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 070a0da..7980155 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -102,7 +102,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -133,8 +132,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * Thread buffer map: on each thread there are future and list of entries which will be streamed after filling * thread batch. */ - private final Map<Long, T2<IgniteCacheFutureImpl, List<DataStreamerEntry>>> threadBufMap = - new ConcurrentHashMap<>(); + private final Map<Long, ThreadBuffer> threadBufMap = new ConcurrentHashMap<>(); /** Isolated receiver. */ private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater(); @@ -637,9 +635,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed try { long threadId = Thread.currentThread().getId(); - T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> futAndEntries = threadBufMap.get(threadId); + ThreadBuffer threadBuf = threadBufMap.get(threadId); - if (futAndEntries == null) { + if (threadBuf == null) { internalFut = new GridFutureAdapter(); fut = new IgniteCacheFutureImpl(internalFut); @@ -649,17 +647,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed activeFuts.add(internalFut); // Initial capacity should be more than batch by 12.5% in order to avoid resizing. - futAndEntries = new T2(fut, new ArrayList<>(bufLdrSzPerThread + (bufLdrSzPerThread >> 3))); + threadBuf = new ThreadBuffer(fut, + new ArrayList<>(bufLdrSzPerThread + (bufLdrSzPerThread >> 3))); - threadBufMap.put(threadId, futAndEntries); + threadBufMap.put(threadId, threadBuf); } else { - fut = futAndEntries.get1(); + fut = threadBuf.getFuture(); internalFut = (GridFutureAdapter)fut.internalFuture(); } - entriesList = futAndEntries.get2(); + entriesList = threadBuf.getEntries(); entriesList.addAll(entries); @@ -1242,8 +1241,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private void flushAllThreadsBufs() { assert busyLock.writeLockedByCurrentThread(); - for (T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> val : threadBufMap.values()) - loadData(val.get2(), (GridFutureAdapter)val.get1().internalFuture()); + for (ThreadBuffer buf : threadBufMap.values()) + loadData(buf.getEntries(), (GridFutureAdapter)buf.getFuture().internalFuture()); threadBufMap.clear(); } @@ -1292,8 +1291,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (cancel) { cancelled = true; + IgniteCheckedException cancellationErr = err; + + if (cancellationErr == null) + cancellationErr = new IgniteCheckedException("Data streamer has been cancelled: " + + DataStreamerImpl.this); + + for (ThreadBuffer buf : threadBufMap.values()) { + GridFutureAdapter internalFut = (GridFutureAdapter)buf.getFuture().internalFuture(); + + internalFut.onDone(cancellationErr); + } + for (Buffer buf : bufMappings.values()) - buf.cancelAll(err); + buf.cancelAll(cancellationErr); } else doFlush(); @@ -1426,6 +1437,43 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * */ + private class ThreadBuffer { + /** Entries. */ + private final List<DataStreamerEntry> entries; + + /** Future. */ + private final IgniteCacheFutureImpl fut; + + /** + * @param fut Future. + * @param entries Entries. + */ + private ThreadBuffer(IgniteCacheFutureImpl fut, List<DataStreamerEntry> entries) { + assert fut != null; + assert entries != null; + + this.fut = fut; + this.entries = entries; + } + + /** + * @return DataStreamerEntry list. + */ + private List<DataStreamerEntry> getEntries() { + return entries; + } + + /** + * @return Future. + */ + private IgniteCacheFutureImpl getFuture() { + return fut; + } + } + + /** + * + */ private class Buffer { /** Node. */ private final ClusterNode node; @@ -1996,10 +2044,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** * @param err Error. */ - void cancelAll(@Nullable IgniteCheckedException err) { - if (err == null) - err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this); - + void cancelAll(IgniteCheckedException err) { for (IgniteInternalFuture<?> f : locFuts) { try { f.cancel(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e20cf5a7/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index d02f466..e4c7660 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.datastreamer; import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -113,6 +114,34 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testCloseWithCancellation() throws Exception { + cnt = 0; + + startGrids(2); + + Ignite g1 = grid(1); + + List<IgniteFuture> futures = new ArrayList<>(); + + IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 100; i++) + futures.add(dataLdr.addData(i, i)); + + try { + dataLdr.close(true); + } + catch (CacheException e) { + // No-op. + } + + for (IgniteFuture fut : futures) + assertTrue(fut.isDone()); + } + + /** + * @throws Exception If failed. + */ public void testNullPointerExceptionUponDataStreamerClosing() throws Exception { cnt = 0;