This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 982de96  IGNITE-16483 Improve ComputeGridMonitor test coverage - Fixes 
#9810.
982de96 is described below

commit 982de9686fa90f1459fa8872eb657e0b76a2b53d
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Thu Feb 10 15:23:16 2022 +0300

    IGNITE-16483 Improve ComputeGridMonitor test coverage - Fixes #9810.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../processors/compute/ComputeGridMonitorTest.java | 117 +++++++++++++++++----
 .../compute/ComputeJobChangePriorityTest.java      |  29 ++++-
 2 files changed, 121 insertions(+), 25 deletions(-)

diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
index 44a06a2..485ecc7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java
@@ -60,8 +60,14 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
     /** Coordinator. */
     private static IgniteEx CRD;
 
-    /** Compute task status monitor. */
-    private ComputeGridMonitorImpl monitor;
+    /** Client node. */
+    private static IgniteEx CLIENT_NODE;
+
+    /** Compute task status monitor for {@link #CRD}. */
+    private ComputeGridMonitorImpl crdMonitor;
+
+    /** Compute task status monitor for {@link #CLIENT_NODE}. */
+    private ComputeGridMonitorImpl clientMonitor;
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
@@ -71,11 +77,15 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
 
         IgniteEx crd = startGrids(2);
 
+        IgniteEx clientNode = startClientGrid(2);
+
         crd.cluster().state(ACTIVE);
 
         awaitPartitionMapExchange();
 
         CRD = crd;
+
+        CLIENT_NODE = clientNode;
     }
 
     /** {@inheritDoc} */
@@ -85,20 +95,26 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
         stopAllGrids();
 
         CRD = null;
+
+        CLIENT_NODE = null;
     }
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        CRD.context().task().listenStatusUpdates(monitor = new 
ComputeGridMonitorImpl());
+        CRD.context().task().listenStatusUpdates(crdMonitor = new 
ComputeGridMonitorImpl());
+
+        CLIENT_NODE.context().task().listenStatusUpdates(clientMonitor = new 
ComputeGridMonitorImpl());
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        CRD.context().task().stopListenStatusUpdates(monitor);
+        CRD.context().task().stopListenStatusUpdates(crdMonitor);
+
+        CLIENT_NODE.context().task().stopListenStatusUpdates(clientMonitor);
     }
 
     /** {@inheritDoc} */
@@ -107,7 +123,7 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Checking get of diffs for the successful execution of the task.
+     * Checking get of diffs for the successful execution of the task on 
server node.
      */
     @Test
     public void simpleTest() {
@@ -115,13 +131,15 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
 
         taskFut.get(getTestTimeout());
 
-        assertTrue(monitor.statusSnapshots.isEmpty());
+        assertTrue(crdMonitor.statusSnapshots.isEmpty());
+        assertTrue(clientMonitor.statusSnapshots.isEmpty());
 
-        assertEquals(3, monitor.statusChanges.size());
+        assertEquals(3, crdMonitor.statusChanges.size());
+        assertTrue(clientMonitor.statusSnapshots.isEmpty());
 
-        checkTaskStarted(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkTaskMapped(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkTaskFinished(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskStarted(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskMapped(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskFinished(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
     }
 
     /**
@@ -142,13 +160,13 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
 
         assertThrows(log, () -> taskFut.get(getTestTimeout()), 
IgniteException.class, null);
 
-        assertTrue(monitor.statusSnapshots.isEmpty());
+        assertTrue(crdMonitor.statusSnapshots.isEmpty());
 
-        assertEquals(3, monitor.statusChanges.size());
+        assertEquals(3, crdMonitor.statusChanges.size());
 
-        checkTaskStarted(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkTaskMapped(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkTaskFailed(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskStarted(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskMapped(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskFailed(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
     }
 
     /**
@@ -173,18 +191,18 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
 
         taskFut.get(getTestTimeout());
 
-        assertTrue(monitor.statusSnapshots.isEmpty());
+        assertTrue(crdMonitor.statusSnapshots.isEmpty());
 
-        assertEquals(4, monitor.statusChanges.size());
+        assertEquals(4, crdMonitor.statusChanges.size());
 
-        checkTaskStarted(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkTaskMapped(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkAttributeChanged(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
-        checkTaskFinished(monitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskStarted(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskMapped(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkAttributeChanged(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskFinished(crdMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
     }
 
     /**
-     * Checking the get of snapshots of task statuses.
+     * Checking the get of snapshots of task statuses for server node.
      *
      * @throws Exception If failed.
      */
@@ -201,7 +219,8 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
         try {
             CRD.context().task().listenStatusUpdates(monitor1);
 
-            assertTrue(monitor.statusSnapshots.isEmpty());
+            assertTrue(crdMonitor.statusSnapshots.isEmpty());
+            assertTrue(clientMonitor.statusSnapshots.isEmpty());
 
             assertEquals(1, monitor1.statusSnapshots.size());
 
@@ -214,6 +233,58 @@ public class ComputeGridMonitorTest extends 
GridCommonAbstractTest {
         taskFut.get(getTestTimeout());
     }
 
+    /**
+     * Checking get of diffs for the successful execution of the task on 
client node.
+     */
+    @Test
+    public void simpleClientNodeTest() {
+        ComputeTaskFuture<Void> taskFut = 
CLIENT_NODE.compute().executeAsync(new NoopComputeTask(), null);
+
+        taskFut.get(getTestTimeout());
+
+        assertTrue(crdMonitor.statusSnapshots.isEmpty());
+        assertTrue(clientMonitor.statusSnapshots.isEmpty());
+
+        assertEquals(3, clientMonitor.statusChanges.size());
+        assertTrue(crdMonitor.statusSnapshots.isEmpty());
+
+        checkTaskStarted(clientMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskMapped(clientMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+        checkTaskFinished(clientMonitor.statusChanges.poll(), 
taskFut.getTaskSession());
+    }
+
+    /**
+     * Checking the get of snapshots of task statuses for client node.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void snapshotsClientNodeTest() throws Exception {
+        ComputeFullWithWaitTask task = new 
ComputeFullWithWaitTask(getTestTimeout());
+
+        ComputeTaskFuture<Void> taskFut = 
CLIENT_NODE.compute().executeAsync(task, null);
+
+        task.doneOnMapFut.get(getTestTimeout());
+
+        ComputeGridMonitorImpl monitor1 = new ComputeGridMonitorImpl();
+
+        try {
+            CLIENT_NODE.context().task().listenStatusUpdates(monitor1);
+
+            assertTrue(clientMonitor.statusSnapshots.isEmpty());
+            assertTrue(crdMonitor.statusSnapshots.isEmpty());
+
+            assertEquals(1, monitor1.statusSnapshots.size());
+
+            checkSnapshot(monitor1.statusSnapshots.poll(), 
taskFut.getTaskSession());
+        }
+        finally {
+            CLIENT_NODE.context().task().stopListenStatusUpdates(monitor1);
+        }
+
+        taskFut.get(getTestTimeout());
+    }
+
     /** */
     private void checkTaskStarted(ComputeTaskStatusSnapshot snapshot, 
ComputeTaskSession session) {
         checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, false, 
false);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
index 8727164..deb90a6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java
@@ -23,9 +23,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskAdapter;
 import org.apache.ignite.compute.ComputeTaskFuture;
@@ -47,6 +49,7 @@ import org.junit.Test;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.testframework.GridTestUtils.DFLT_TEST_TIMEOUT;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
 
 /**
@@ -158,6 +161,8 @@ public class ComputeJobChangePriorityTest extends 
GridCommonAbstractTest {
         Object val,
         boolean expHandleCollisionOnChangeTaskAttrs
     ) throws Exception {
+        WaitJob.waitFut.reset();
+
         ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(new 
NoopTask(), null);
 
         for (Ignite n : G.allGrids())
@@ -175,6 +180,8 @@ public class ComputeJobChangePriorityTest extends 
GridCommonAbstractTest {
                     .getTaskSession().waitForAttribute(key, getTestTimeout()));
         }
 
+        WaitJob.waitFut.onDone();
+
         for (Ignite n : G.allGrids()) {
             GridFutureAdapter<Void> fut = 
PriorityQueueCollisionSpiEx.spiEx(n).onChangeTaskAttrsFut;
 
@@ -205,7 +212,7 @@ public class ComputeJobChangePriorityTest extends 
GridCommonAbstractTest {
         @Override public void onCollision(CollisionContext ctx) {
             if (!waitJobFut.isDone()) {
                 ctx.waitingJobs().stream()
-                    .filter(collisionJobCtx -> collisionJobCtx.getJob() 
instanceof NoopJob)
+                    .filter(collisionJobCtx -> collisionJobCtx.getJob() 
instanceof WaitJob)
                     .findAny()
                     .ifPresent(waitJobFut::onDone);
             }
@@ -248,7 +255,7 @@ public class ComputeJobChangePriorityTest extends 
GridCommonAbstractTest {
             List<ClusterNode> subgrid,
             Void arg
         ) throws IgniteException {
-            return subgrid.stream().collect(toMap(n -> new NoopJob(), 
identity()));
+            return subgrid.stream().collect(toMap(n -> new WaitJob(), 
identity()));
         }
 
         /** {@inheritDoc} */
@@ -256,4 +263,22 @@ public class ComputeJobChangePriorityTest extends 
GridCommonAbstractTest {
             return null;
         }
     }
+
+    /** */
+    private static class WaitJob extends ComputeJobAdapter {
+        /** */
+        static final GridFutureAdapter<Void> waitFut = new 
GridFutureAdapter<>();
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            try {
+                waitFut.get(DFLT_TEST_TIMEOUT);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+
+            return null;
+        }
+    }
 }

Reply via email to