Repository: ignite
Updated Branches:
  refs/heads/master c8be8c2b8 -> e20cf5a73


IGNITE-8462 DataStreamerImpl.close(true) should done all futures

Signed-off-by: Anton Vinogradov <a...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e20cf5a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e20cf5a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e20cf5a7

Branch: refs/heads/master
Commit: e20cf5a731d7ed05ee050785c2e7d46bcf6a7f44
Parents: c8be8c2
Author: Fedotov <vane...@gmail.com>
Authored: Fri Jun 22 13:11:20 2018 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Fri Jun 22 13:11:20 2018 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 77 ++++++++++++++++----
 .../datastreamer/DataStreamerImplSelfTest.java  | 29 ++++++++
 2 files changed, 90 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e20cf5a7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 070a0da..7980155 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -102,7 +102,6 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -133,8 +132,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
      * Thread buffer map: on each thread there are future and list of entries 
which will be streamed after filling
      * thread batch.
      */
-    private final Map<Long, T2<IgniteCacheFutureImpl, 
List<DataStreamerEntry>>> threadBufMap =
-        new ConcurrentHashMap<>();
+    private final Map<Long, ThreadBuffer> threadBufMap = new 
ConcurrentHashMap<>();
 
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new 
IsolatedUpdater();
@@ -637,9 +635,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         try {
             long threadId = Thread.currentThread().getId();
 
-            T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> futAndEntries = 
threadBufMap.get(threadId);
+            ThreadBuffer threadBuf = threadBufMap.get(threadId);
 
-            if (futAndEntries == null) {
+            if (threadBuf == null) {
                 internalFut = new GridFutureAdapter();
 
                 fut = new IgniteCacheFutureImpl(internalFut);
@@ -649,17 +647,18 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 activeFuts.add(internalFut);
 
                 // Initial capacity should be more than batch by 12.5% in 
order to avoid resizing.
-                futAndEntries = new T2(fut, new ArrayList<>(bufLdrSzPerThread 
+ (bufLdrSzPerThread >> 3)));
+                threadBuf = new ThreadBuffer(fut,
+                    new ArrayList<>(bufLdrSzPerThread + (bufLdrSzPerThread >> 
3)));
 
-                threadBufMap.put(threadId, futAndEntries);
+                threadBufMap.put(threadId, threadBuf);
             }
             else {
-                fut = futAndEntries.get1();
+                fut = threadBuf.getFuture();
 
                 internalFut = (GridFutureAdapter)fut.internalFuture();
             }
 
-            entriesList = futAndEntries.get2();
+            entriesList = threadBuf.getEntries();
 
             entriesList.addAll(entries);
 
@@ -1242,8 +1241,8 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     private void flushAllThreadsBufs() {
         assert busyLock.writeLockedByCurrentThread();
 
-        for (T2<IgniteCacheFutureImpl, List<DataStreamerEntry>> val : 
threadBufMap.values())
-            loadData(val.get2(), 
(GridFutureAdapter)val.get1().internalFuture());
+        for (ThreadBuffer buf : threadBufMap.values())
+            loadData(buf.getEntries(), 
(GridFutureAdapter)buf.getFuture().internalFuture());
 
         threadBufMap.clear();
     }
@@ -1292,8 +1291,20 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 if (cancel) {
                     cancelled = true;
 
+                    IgniteCheckedException cancellationErr = err;
+
+                    if (cancellationErr == null)
+                        cancellationErr = new IgniteCheckedException("Data 
streamer has been cancelled: " +
+                            DataStreamerImpl.this);
+
+                    for (ThreadBuffer buf : threadBufMap.values()) {
+                        GridFutureAdapter internalFut = 
(GridFutureAdapter)buf.getFuture().internalFuture();
+
+                        internalFut.onDone(cancellationErr);
+                    }
+
                     for (Buffer buf : bufMappings.values())
-                        buf.cancelAll(err);
+                        buf.cancelAll(cancellationErr);
                 }
                 else
                     doFlush();
@@ -1426,6 +1437,43 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
     /**
      *
      */
+    private class ThreadBuffer {
+        /** Entries. */
+        private final List<DataStreamerEntry> entries;
+
+        /** Future. */
+        private final IgniteCacheFutureImpl fut;
+
+        /**
+         * @param fut Future.
+         * @param entries Entries.
+         */
+        private ThreadBuffer(IgniteCacheFutureImpl fut, 
List<DataStreamerEntry> entries) {
+            assert fut != null;
+            assert entries != null;
+
+            this.fut = fut;
+            this.entries = entries;
+        }
+
+        /**
+         * @return DataStreamerEntry list.
+         */
+        private List<DataStreamerEntry> getEntries() {
+            return entries;
+        }
+
+        /**
+         * @return Future.
+         */
+        private IgniteCacheFutureImpl getFuture() {
+            return fut;
+        }
+    }
+
+    /**
+     *
+     */
     private class Buffer {
         /** Node. */
         private final ClusterNode node;
@@ -1996,10 +2044,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
         /**
          * @param err Error.
          */
-        void cancelAll(@Nullable IgniteCheckedException err) {
-            if (err == null)
-                err = new IgniteCheckedException("Data streamer has been 
cancelled: " + DataStreamerImpl.this);
-
+        void cancelAll(IgniteCheckedException err) {
             for (IgniteInternalFuture<?> f : locFuts) {
                 try {
                     f.cancel();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e20cf5a7/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index d02f466..e4c7660 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.datastreamer;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -113,6 +114,34 @@ public class DataStreamerImplSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCloseWithCancellation() throws Exception {
+        cnt = 0;
+
+        startGrids(2);
+
+        Ignite g1 = grid(1);
+
+        List<IgniteFuture> futures = new ArrayList<>();
+
+        IgniteDataStreamer<Object, Object> dataLdr = 
g1.dataStreamer(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 100; i++)
+            futures.add(dataLdr.addData(i, i));
+
+        try {
+            dataLdr.close(true);
+        }
+        catch (CacheException e) {
+            // No-op.
+        }
+
+        for (IgniteFuture fut : futures)
+            assertTrue(fut.isDone());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testNullPointerExceptionUponDataStreamerClosing() throws 
Exception {
         cnt = 0;
 

Reply via email to