This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1873228  IGNITE-13106 Java thin client: Fix race between response and 
notification for compute tasks - Fixes #7885.
1873228 is described below

commit 1873228a8b63b2347e2dedbd2c042ca25d7bb7ad
Author: Aleksey Plekhanov <plehanov.a...@gmail.com>
AuthorDate: Mon Jun 15 11:25:44 2020 +0500

    IGNITE-13106 Java thin client: Fix race between response and notification 
for compute tasks - Fixes #7885.
    
    Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com>
---
 .../internal/client/thin/ClientComputeImpl.java    | 24 +++++++++++++++++++---
 .../internal/client/thin/ComputeTaskTest.java      | 20 ++++++++++--------
 2 files changed, 33 insertions(+), 11 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index 6238e52..3d53c010 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -207,7 +208,12 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
             if (task == null) // Channel is closed concurrently, retry with 
another channel.
                 continue;
 
-            task.fut.listen(f -> removeTask(task.ch, task.taskId));
+            task.fut.listen(f -> {
+                // Don't remove task if future was canceled by user. This task 
can be added again later by notification.
+                // To prevent leakage tasks for cancelled futures will be 
removed on notification (or channel close event).
+                if (!f.isCancelled())
+                    removeTask(task.ch, task.taskId);
+            });
 
             return new ClientFutureImpl<>((GridFutureAdapter<R>)task.fut);
         }
@@ -257,13 +263,16 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
         if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
             Object res = payload == null ? null : utils.readObject(new 
BinaryHeapInputStream(payload), false);
 
-            ClientComputeTask<Object> task = removeTask(ch, rsrcId);
+            ClientComputeTask<Object> task = addTask(ch, rsrcId);
 
             if (task != null) { // If channel is closed concurrently, task is 
already done with "channel closed" reason.
                 if (err == null)
                     task.fut.onDone(res);
                 else
                     task.fut.onDone(err);
+
+                if (task.fut.isCancelled())
+                    removeTask(ch, rsrcId);
             }
         }
     }
@@ -410,7 +419,16 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
             fut = new GridFutureAdapter<R>() {
                 @Override public boolean cancel() {
                     if (onCancelled()) {
-                        ch.service(RESOURCE_CLOSE, req -> 
req.out().writeLong(taskId), null);
+                        try {
+                            ch.service(RESOURCE_CLOSE, req -> 
req.out().writeLong(taskId), null);
+                        }
+                        catch (ClientServerError e) {
+                            // Ignore "resource doesn't exist" error. The task 
can be completed concurrently on the
+                            // server, but we already complete future with 
"cancelled" state, so result will never be
+                            // received by a client.
+                            if (e.getCode() != 
ClientStatus.RESOURCE_DOES_NOT_EXIST)
+                                throw new ClientException(e);
+                        }
 
                         return true;
                     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
index d400daf..4e13f49 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ComputeTaskTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
@@ -56,7 +55,6 @@ import org.apache.ignite.mxbean.ClientProcessorMXBean;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -192,7 +190,8 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
 
             fut.cancel(true);
 
-            
assertTrue(((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty());
+            assertTrue(GridTestUtils.waitForCondition(
+                () -> 
((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
 
             assertTrue(fut.isCancelled());
             assertTrue(fut.isDone());
@@ -343,7 +342,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
 
             compute.execute(TestTask.class.getName(), null);
 
-            assertEquals(1, compute.activeTaskFutures().size());
+            assertTrue(GridTestUtils.waitForCondition(() -> 
compute.activeTaskFutures().size() == 1, TIMEOUT));
 
             assertTrue(fut1.isDone());
 
@@ -359,7 +358,7 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
 
             fut3.get(TIMEOUT, TimeUnit.MILLISECONDS);
 
-            assertTrue(compute.activeTaskFutures().isEmpty());
+            assertTrue(GridTestUtils.waitForCondition(() -> 
compute.activeTaskFutures().isEmpty(), TIMEOUT));
         }
     }
 
@@ -481,7 +480,6 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
      *
      */
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-12845";)
     public void testExecuteTaskConcurrentLoad() throws Exception {
         try (IgniteClient client = startClient(0)) {
             int threadsCnt = 20;
@@ -511,9 +509,14 @@ public class ComputeTaskTest extends 
GridCommonAbstractTest {
 
                             Future<T2<UUID, Set<UUID>>> fut = 
compute.executeAsync(TestTask.class.getName(), null);
 
+                            boolean cancelled = (i % 3 == 0) && 
fut.cancel(true);
+
                             assertEquals((Integer)i, cache.get(threadIdx));
 
-                            assertEquals(nodeIds(nodeIdx), fut.get().get2());
+                            if (cancelled)
+                                assertTrue(fut.isCancelled());
+                            else
+                                assertEquals(nodeIds(nodeIdx), 
fut.get().get2());
                         }
                     }
                     catch (ExecutionException e) {
@@ -527,7 +530,8 @@ public class ComputeTaskTest extends GridCommonAbstractTest 
{
 
                 }, threadsCnt, "run-task-async");
 
-            
assertTrue(((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty());
+            assertTrue(GridTestUtils.waitForCondition(
+                () -> 
((ClientComputeImpl)client.compute()).activeTaskFutures().isEmpty(), TIMEOUT));
         }
     }
 

Reply via email to