This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 982de96 IGNITE-16483 Improve ComputeGridMonitor test coverage - Fixes #9810. 982de96 is described below commit 982de9686fa90f1459fa8872eb657e0b76a2b53d Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Thu Feb 10 15:23:16 2022 +0300 IGNITE-16483 Improve ComputeGridMonitor test coverage - Fixes #9810. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../processors/compute/ComputeGridMonitorTest.java | 117 +++++++++++++++++---- .../compute/ComputeJobChangePriorityTest.java | 29 ++++- 2 files changed, 121 insertions(+), 25 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java index 44a06a2..485ecc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeGridMonitorTest.java @@ -60,8 +60,14 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { /** Coordinator. */ private static IgniteEx CRD; - /** Compute task status monitor. */ - private ComputeGridMonitorImpl monitor; + /** Client node. */ + private static IgniteEx CLIENT_NODE; + + /** Compute task status monitor for {@link #CRD}. */ + private ComputeGridMonitorImpl crdMonitor; + + /** Compute task status monitor for {@link #CLIENT_NODE}. */ + private ComputeGridMonitorImpl clientMonitor; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { @@ -71,11 +77,15 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { IgniteEx crd = startGrids(2); + IgniteEx clientNode = startClientGrid(2); + crd.cluster().state(ACTIVE); awaitPartitionMapExchange(); CRD = crd; + + CLIENT_NODE = clientNode; } /** {@inheritDoc} */ @@ -85,20 +95,26 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { stopAllGrids(); CRD = null; + + CLIENT_NODE = null; } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - CRD.context().task().listenStatusUpdates(monitor = new ComputeGridMonitorImpl()); + CRD.context().task().listenStatusUpdates(crdMonitor = new ComputeGridMonitorImpl()); + + CLIENT_NODE.context().task().listenStatusUpdates(clientMonitor = new ComputeGridMonitorImpl()); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); - CRD.context().task().stopListenStatusUpdates(monitor); + CRD.context().task().stopListenStatusUpdates(crdMonitor); + + CLIENT_NODE.context().task().stopListenStatusUpdates(clientMonitor); } /** {@inheritDoc} */ @@ -107,7 +123,7 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { } /** - * Checking get of diffs for the successful execution of the task. + * Checking get of diffs for the successful execution of the task on server node. */ @Test public void simpleTest() { @@ -115,13 +131,15 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { taskFut.get(getTestTimeout()); - assertTrue(monitor.statusSnapshots.isEmpty()); + assertTrue(crdMonitor.statusSnapshots.isEmpty()); + assertTrue(clientMonitor.statusSnapshots.isEmpty()); - assertEquals(3, monitor.statusChanges.size()); + assertEquals(3, crdMonitor.statusChanges.size()); + assertTrue(clientMonitor.statusSnapshots.isEmpty()); - checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkTaskFinished(monitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskStarted(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskMapped(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskFinished(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); } /** @@ -142,13 +160,13 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { assertThrows(log, () -> taskFut.get(getTestTimeout()), IgniteException.class, null); - assertTrue(monitor.statusSnapshots.isEmpty()); + assertTrue(crdMonitor.statusSnapshots.isEmpty()); - assertEquals(3, monitor.statusChanges.size()); + assertEquals(3, crdMonitor.statusChanges.size()); - checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkTaskFailed(monitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskStarted(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskMapped(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskFailed(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); } /** @@ -173,18 +191,18 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { taskFut.get(getTestTimeout()); - assertTrue(monitor.statusSnapshots.isEmpty()); + assertTrue(crdMonitor.statusSnapshots.isEmpty()); - assertEquals(4, monitor.statusChanges.size()); + assertEquals(4, crdMonitor.statusChanges.size()); - checkTaskStarted(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkTaskMapped(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkAttributeChanged(monitor.statusChanges.poll(), taskFut.getTaskSession()); - checkTaskFinished(monitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskStarted(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskMapped(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkAttributeChanged(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskFinished(crdMonitor.statusChanges.poll(), taskFut.getTaskSession()); } /** - * Checking the get of snapshots of task statuses. + * Checking the get of snapshots of task statuses for server node. * * @throws Exception If failed. */ @@ -201,7 +219,8 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { try { CRD.context().task().listenStatusUpdates(monitor1); - assertTrue(monitor.statusSnapshots.isEmpty()); + assertTrue(crdMonitor.statusSnapshots.isEmpty()); + assertTrue(clientMonitor.statusSnapshots.isEmpty()); assertEquals(1, monitor1.statusSnapshots.size()); @@ -214,6 +233,58 @@ public class ComputeGridMonitorTest extends GridCommonAbstractTest { taskFut.get(getTestTimeout()); } + /** + * Checking get of diffs for the successful execution of the task on client node. + */ + @Test + public void simpleClientNodeTest() { + ComputeTaskFuture<Void> taskFut = CLIENT_NODE.compute().executeAsync(new NoopComputeTask(), null); + + taskFut.get(getTestTimeout()); + + assertTrue(crdMonitor.statusSnapshots.isEmpty()); + assertTrue(clientMonitor.statusSnapshots.isEmpty()); + + assertEquals(3, clientMonitor.statusChanges.size()); + assertTrue(crdMonitor.statusSnapshots.isEmpty()); + + checkTaskStarted(clientMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskMapped(clientMonitor.statusChanges.poll(), taskFut.getTaskSession()); + checkTaskFinished(clientMonitor.statusChanges.poll(), taskFut.getTaskSession()); + } + + /** + * Checking the get of snapshots of task statuses for client node. + * + * @throws Exception If failed. + */ + @Test + public void snapshotsClientNodeTest() throws Exception { + ComputeFullWithWaitTask task = new ComputeFullWithWaitTask(getTestTimeout()); + + ComputeTaskFuture<Void> taskFut = CLIENT_NODE.compute().executeAsync(task, null); + + task.doneOnMapFut.get(getTestTimeout()); + + ComputeGridMonitorImpl monitor1 = new ComputeGridMonitorImpl(); + + try { + CLIENT_NODE.context().task().listenStatusUpdates(monitor1); + + assertTrue(clientMonitor.statusSnapshots.isEmpty()); + assertTrue(crdMonitor.statusSnapshots.isEmpty()); + + assertEquals(1, monitor1.statusSnapshots.size()); + + checkSnapshot(monitor1.statusSnapshots.poll(), taskFut.getTaskSession()); + } + finally { + CLIENT_NODE.context().task().stopListenStatusUpdates(monitor1); + } + + taskFut.get(getTestTimeout()); + } + /** */ private void checkTaskStarted(ComputeTaskStatusSnapshot snapshot, ComputeTaskSession session) { checkSnapshot(snapshot, (GridTaskSessionImpl)session, RUNNING, false, false); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java index 8727164..deb90a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/ComputeJobChangePriorityTest.java @@ -23,9 +23,11 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Stream; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeTaskAdapter; import org.apache.ignite.compute.ComputeTaskFuture; @@ -47,6 +49,7 @@ import org.junit.Test; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toMap; import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.testframework.GridTestUtils.DFLT_TEST_TIMEOUT; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; /** @@ -158,6 +161,8 @@ public class ComputeJobChangePriorityTest extends GridCommonAbstractTest { Object val, boolean expHandleCollisionOnChangeTaskAttrs ) throws Exception { + WaitJob.waitFut.reset(); + ComputeTaskFuture<Void> taskFut = CRD.compute().executeAsync(new NoopTask(), null); for (Ignite n : G.allGrids()) @@ -175,6 +180,8 @@ public class ComputeJobChangePriorityTest extends GridCommonAbstractTest { .getTaskSession().waitForAttribute(key, getTestTimeout())); } + WaitJob.waitFut.onDone(); + for (Ignite n : G.allGrids()) { GridFutureAdapter<Void> fut = PriorityQueueCollisionSpiEx.spiEx(n).onChangeTaskAttrsFut; @@ -205,7 +212,7 @@ public class ComputeJobChangePriorityTest extends GridCommonAbstractTest { @Override public void onCollision(CollisionContext ctx) { if (!waitJobFut.isDone()) { ctx.waitingJobs().stream() - .filter(collisionJobCtx -> collisionJobCtx.getJob() instanceof NoopJob) + .filter(collisionJobCtx -> collisionJobCtx.getJob() instanceof WaitJob) .findAny() .ifPresent(waitJobFut::onDone); } @@ -248,7 +255,7 @@ public class ComputeJobChangePriorityTest extends GridCommonAbstractTest { List<ClusterNode> subgrid, Void arg ) throws IgniteException { - return subgrid.stream().collect(toMap(n -> new NoopJob(), identity())); + return subgrid.stream().collect(toMap(n -> new WaitJob(), identity())); } /** {@inheritDoc} */ @@ -256,4 +263,22 @@ public class ComputeJobChangePriorityTest extends GridCommonAbstractTest { return null; } } + + /** */ + private static class WaitJob extends ComputeJobAdapter { + /** */ + static final GridFutureAdapter<Void> waitFut = new GridFutureAdapter<>(); + + /** {@inheritDoc} */ + @Override public Object execute() throws IgniteException { + try { + waitFut.get(DFLT_TEST_TIMEOUT); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + return null; + } + } }