IGNITE-3846 Fixed wrong recording of JobEvent for @GridInternal task. Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/527a2996 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/527a2996 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/527a2996 Branch: refs/heads/ignite-3661 Commit: 527a2996fea51b81267d0fc96da0f4d6076ed436 Parents: 1cc502d Author: Alexey Kuznetsov <akuznet...@apache.org> Authored: Fri Sep 9 16:51:15 2016 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Fri Sep 9 16:51:15 2016 +0700 ---------------------------------------------------------------------- .../processors/task/GridTaskWorker.java | 2 +- .../internal/GridEventStorageSelfTest.java | 97 +++++++++++++++++++- 2 files changed, 97 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/527a2996/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 00ea29e..79d1a2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -1500,7 +1500,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { * @param msg Event message. */ private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode evtNode, String msg) { - if (ctx.event().isRecordable(evtType)) { + if (!internal && ctx.event().isRecordable(evtType)) { JobEvent evt = new JobEvent(); evt.message(msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/527a2996/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java index a3b9608..4f98b0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -31,12 +32,17 @@ import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeTaskSplitAdapter; import org.apache.ignite.events.Event; +import org.apache.ignite.events.JobEvent; +import org.apache.ignite.events.TaskEvent; +import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; import static org.apache.ignite.events.EventType.EVTS_ALL_MINUS_METRIC_UPDATE; +import static org.apache.ignite.events.EventType.EVTS_JOB_EXECUTION; +import static org.apache.ignite.events.EventType.EVTS_TASK_EXECUTION; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_TASK_STARTED; @@ -193,6 +199,53 @@ public class GridEventStorageSelfTest extends GridCommonAbstractTest { } /** + * Checks that specified event is not task or job event. + * + * @param evt Event to check. + */ + private void checkGridInternalEvent(Event evt) { + assertFalse("Found TASK event for task marked with @GridInternal [evtType=" + evt.type() + "]", evt instanceof TaskEvent); + assertFalse("Found JOB event for task marked with @GridInternal [evtType=" + evt.type() + "]", evt instanceof JobEvent); + } + + /** + * @throws Exception In case of error. + */ + public void testGridInternalEvents() throws Exception { + IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + checkGridInternalEvent(evt); + + return true; + } + }; + + ignite1.events().localListen(lsnr, EVTS_TASK_EXECUTION); + ignite1.events().localListen(lsnr, EVTS_JOB_EXECUTION); + ignite2.events().localListen(lsnr, EVTS_TASK_EXECUTION); + ignite2.events().localListen(lsnr, EVTS_JOB_EXECUTION); + + executeGridInternalTask(ignite1); + + Collection<Event> evts1 = ignite1.events().localQuery(F.<Event>alwaysTrue()); + Collection<Event> evts2 = ignite2.events().localQuery(F.<Event>alwaysTrue()); + + assert evts1 != null; + assert evts2 != null; + + for (Event evt : evts1) + checkGridInternalEvent(evt); + + for (Event evt : evts2) + checkGridInternalEvent(evt); + + assert ignite1.events().stopLocalListen(lsnr, EVTS_TASK_EXECUTION); + assert ignite1.events().stopLocalListen(lsnr, EVTS_JOB_EXECUTION); + assert ignite2.events().stopLocalListen(lsnr, EVTS_TASK_EXECUTION); + assert ignite2.events().stopLocalListen(lsnr, EVTS_JOB_EXECUTION); + } + + /** * Create events in grid. * * @param ignite Grid. @@ -204,6 +257,15 @@ public class GridEventStorageSelfTest extends GridCommonAbstractTest { } /** + * Execute task marged with {@code GridInternal} annotation. + * + * @param ignite Grid. + */ + private void executeGridInternalTask(Ignite ignite) { + ignite.compute().execute(GridInternalTestTask.class.getName(), null); + } + + /** * Test task. */ private static class GridEventTestTask extends ComputeTaskSplitAdapter<Object, Object> { @@ -232,6 +294,39 @@ public class GridEventStorageSelfTest extends GridCommonAbstractTest { } /** + * Test task marked with @GridInternal. + */ + @GridInternal + private static class GridInternalTestTask extends ComputeTaskSplitAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) { + Collection<ComputeJob> jobs = new ArrayList<>(gridSize); + + for (int i = 0; i < gridSize; i++) + jobs.add(new GridInternalTestJob()); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public Serializable reduce(List<ComputeJobResult> results) { + assert results != null; + + return "GridInternalTestTask-result."; + } + } + + /** + * Test job. + */ + private static class GridInternalTestJob extends ComputeJobAdapter { + /** {@inheritDoc} */ + @Override public String execute() { + return "GridInternalTestJob-result."; + } + } + + /** * Test event listener. */ private class TestEventListener implements IgnitePredicate<Event> { @@ -274,4 +369,4 @@ public class GridEventStorageSelfTest extends GridCommonAbstractTest { return evt.type() == EVT_TASK_STARTED; } } -} \ No newline at end of file +}