IGNITE-8813 Added ComputeJobResultPolicy to JobEvent - Fixes #4213. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5af5993 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5af5993 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5af5993 Branch: refs/heads/ignite-8446 Commit: e5af59937726362bc5253d57fdf664c626634574 Parents: e20cf5a Author: dkarachentsev <dkarachent...@gridgain.com> Authored: Fri Jun 22 15:30:44 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Jun 22 15:30:44 2018 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/events/JobEvent.java | 73 ++++++++++------ .../processors/task/GridTaskWorker.java | 11 ++- .../GridEventStorageCheckAllEventsSelfTest.java | 90 +++++++++++++++++++- 3 files changed, 145 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e5af5993/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java b/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java index 13a53a7..b279ddc 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java @@ -19,41 +19,44 @@ package org.apache.ignite.events; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * Grid job event. * <p> - * Grid events are used for notification about what happens within the grid. Note that by - * design Ignite keeps all events generated on the local node locally and it provides - * APIs for performing a distributed queries across multiple nodes: + * Grid events are used for notification about what happens within the grid. Note that by design Ignite keeps all events + * generated on the local node locally and it provides APIs for performing a distributed queries across multiple nodes: * <ul> - * <li> - * {@link org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate, long, int...)} - - * asynchronously querying events occurred on the nodes specified, including remote nodes. - * </li> - * <li> - * {@link org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate, int...)} - - * querying only local events stored on this local node. - * </li> - * <li> - * {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)} - - * listening to local grid events (events from remote nodes not included). - * </li> + * <li> + * {@link org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate, long, int...)} - + * asynchronously querying events occurred on the nodes specified, including remote nodes. + * </li> + * <li> + * {@link org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate, int...)} - querying only + * local events stored on this local node. + * </li> + * <li> + * {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)} - listening to + * local grid events (events from remote nodes not included). + * </li> * </ul> - * User can also wait for events using method {@link org.apache.ignite.IgniteEvents#waitForLocal(org.apache.ignite.lang.IgnitePredicate, int...)}. + * User can also wait for events using method {@link org.apache.ignite.IgniteEvents#waitForLocal(org.apache.ignite.lang.IgnitePredicate, + * int...)}. * <h1 class="header">Events and Performance</h1> - * Note that by default all events in Ignite are enabled and therefore generated and stored - * by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds - * under the load and therefore it creates a significant additional load on the system. If these events are - * not needed by the application this load is unnecessary and leads to significant performance degradation. + * Note that by default all events in Ignite are enabled and therefore generated and stored by whatever event storage + * SPI is configured. Ignite can and often does generate thousands events per seconds under the load and therefore it + * creates a significant additional load on the system. If these events are not needed by the application this load is + * unnecessary and leads to significant performance degradation. * <p> - * It is <b>highly recommended</b> to enable only those events that your application logic requires - * by using {@link org.apache.ignite.configuration.IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain - * events are required for Ignite's internal operations and such events will still be generated but not stored by - * event storage SPI if they are disabled in Ignite configuration. + * It is <b>highly recommended</b> to enable only those events that your application logic requires by using {@link + * org.apache.ignite.configuration.IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that + * certain events are required for Ignite's internal operations and such events will still be generated but not stored + * by event storage SPI if they are disabled in Ignite configuration. + * * @see EventType#EVT_JOB_CANCELLED * @see EventType#EVT_JOB_FAILED * @see EventType#EVT_JOB_FAILED_OVER @@ -88,6 +91,9 @@ public class JobEvent extends EventAdapter { /** */ private UUID taskSubjId; + /** */ + private ComputeJobResultPolicy resPlc; + /** {@inheritDoc} */ @Override public String shortDisplay() { return name() + ": taskName=" + taskName; @@ -225,6 +231,25 @@ public class JobEvent extends EventAdapter { this.taskSubjId = taskSubjId; } + /** + * Gets job result policy. Not null for {@link EventType#EVT_JOB_RESULTED} + * and {@link EventType#EVT_JOB_FAILED_OVER} event types. + * + * @return Result policy. + */ + @Nullable public ComputeJobResultPolicy resultPolicy() { + return resPlc; + } + + /** + * Sets job result policy. + * + * @param resPlc New result policy. + */ + public void resultPolicy(@Nullable ComputeJobResultPolicy resPlc) { + this.resPlc = resPlc; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(JobEvent.class, this, http://git-wip-us.apache.org/repos/asf/ignite/blob/e5af5993/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 5693eed..78efd2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -599,7 +599,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (resCache) sibs.add(sib); - recordJobEvent(EVT_JOB_MAPPED, jobId, node, "Job got mapped."); + recordJobEvent(EVT_JOB_MAPPED, jobId, node, null, "Job got mapped."); } synchronized (mux) { @@ -1057,7 +1057,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } finally { recordJobEvent(EVT_JOB_RESULTED, jobRes.getJobContext().getJobId(), - jobRes.getNode(), "Job got resulted with: " + plc); + jobRes.getNode(), plc, "Job got resulted with: " + plc); } if (log.isDebugEnabled()) @@ -1262,7 +1262,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (timeout > 0) { recordJobEvent(EVT_JOB_FAILED_OVER, jobRes.getJobContext().getJobId(), - jobRes.getNode(), "Job failed over."); + jobRes.getNode(), FAILOVER, "Job failed over."); // Send new reference to remote nodes for execution. sendRequest(jobRes); @@ -1542,9 +1542,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { * @param evtType Event type. * @param jobId Job ID. * @param evtNode Event node. + * @param plc Job result policy. * @param msg Event message. */ - private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode evtNode, String msg) { + private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode evtNode, + @Nullable ComputeJobResultPolicy plc, String msg) { if (!internal && ctx.event().isRecordable(evtType)) { JobEvent evt = new JobEvent(); @@ -1557,6 +1559,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { evt.jobId(jobId); evt.type(evtType); evt.taskSubjectId(ses.subjectId()); + evt.resultPolicy(plc); ctx.event().record(evt); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e5af5993/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java index 30a16b0..9186902 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java @@ -22,12 +22,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.compute.ComputeTaskFuture; import org.apache.ignite.compute.ComputeTaskMapAsync; import org.apache.ignite.compute.ComputeTaskSession; @@ -52,11 +54,14 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.testframework.junits.common.GridCommonTest; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER; +import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT; import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_LOADED; import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_REMOVED; import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_SAVED; import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED; import static org.apache.ignite.events.EventType.EVT_JOB_FAILED; +import static org.apache.ignite.events.EventType.EVT_JOB_FAILED_OVER; import static org.apache.ignite.events.EventType.EVT_JOB_FINISHED; import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED; import static org.apache.ignite.events.EventType.EVT_JOB_QUEUED; @@ -145,6 +150,9 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe assertEvent(evts.get(8).type(), EVT_TASK_REDUCED, evts); assertEvent(evts.get(9).type(), EVT_TASK_FINISHED, evts); assertEvent(evts.get(10).type(), EVT_JOB_FINISHED, evts); + + assertNotNull(((JobEvent)evts.get(7)).resultPolicy()); + assertEquals(WAIT, ((JobEvent)evts.get(7)).resultPolicy()); } /** @@ -185,6 +193,9 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); assertEvent(evts.get(10).type(), EVT_TASK_UNDEPLOYED, evts); assertEvent(evts.get(11).type(), EVT_TASK_DEPLOYED, evts); + + assertNotNull(((JobEvent)evts.get(6)).resultPolicy()); + assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy()); } /** @@ -209,11 +220,55 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts); assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts); assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts); + + assertNotNull(((JobEvent)evts.get(6)).resultPolicy()); + assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy()); } /** * @throws Exception If test failed. */ + public void testFailoverJobTask() throws Exception { + startGrid(0); + + try { + generateEvents(null, new GridAllEventsSuccessTestJob()).get(); + + ignite.compute().execute(GridFailoverTestTask.class.getName(), new GridAllEventsSuccessTestJob()); + + long tstamp = startTimestamp(); + + ignite.compute().execute(GridFailoverTestTask.class.getName(), new GridAllEventsSuccessTestJob()); + + List<Event> evts = pullEvents(tstamp, 12, GridFailoverTestTask.class.getName()); + + int cnt = 0; + + assertEvent(evts.get(cnt++).type(), EVT_TASK_STARTED, evts); + assertEvent(evts.get(cnt++).type(), EVT_JOB_MAPPED, evts); + assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts); + + assertEquals(((JobEvent)evts.get(cnt - 1)).resultPolicy(), FAILOVER); + + assertEvent(evts.get(cnt++).type(), EVT_JOB_FAILED_OVER, evts); + assertEvent(evts.get(cnt++).type(), EVT_JOB_QUEUED, evts); + assertEvent(evts.get(cnt++).type(), EVT_JOB_STARTED, evts); + assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_SAVED, evts); + assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_REMOVED, evts); + assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts); + assertEvent(evts.get(cnt++).type(), EVT_TASK_REDUCED, evts); + assertEvent(evts.get(cnt++).type(), EVT_TASK_FINISHED, evts); + assertEvent(evts.get(cnt++).type(), EVT_JOB_FINISHED, evts); + } + finally { + stopGrid(0); + } + } + + + /** + * @throws Exception If test failed. + */ public void testFailTask() throws Exception { long tstamp = startTimestamp(); @@ -237,6 +292,9 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe assertEvent(evts.get(4).type(), EVT_JOB_RESULTED, evts); assertEvent(evts.get(5).type(), EVT_TASK_FAILED, evts); assertEvent(evts.get(6).type(), EVT_JOB_FAILED, evts); + + // Exception was thrown, so policy is null. + assertNull(((JobEvent)evts.get(4)).resultPolicy()); } /** @@ -326,7 +384,20 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe * @throws Exception If failed. */ private List<Event> pullEvents(long since, int evtCnt) throws Exception { - IgnitePredicate<Event> filter = new CustomEventFilter(GridAllEventsTestTask.class.getName(), since); + return pullEvents(since, evtCnt, GridAllEventsTestTask.class.getName()); + } + + /** + * Pull all test task related events since the given moment. + * + * @param since Earliest time to pulled events. + * @param evtCnt Expected event count. + * @param taskName Name of the task. + * @return List of events. + * @throws Exception If failed. + */ + private List<Event> pullEvents(long since, int evtCnt, String taskName) throws Exception { + IgnitePredicate<Event> filter = new CustomEventFilter(taskName, since); for (int i = 0; i < 3; i++) { List<Event> evts = new ArrayList<>(ignite.events().localQuery((filter))); @@ -497,4 +568,21 @@ public class GridEventStorageCheckAllEventsSelfTest extends GridCommonAbstractTe return (Serializable)results; } } + + /** + * + */ + private static class GridFailoverTestTask extends GridAllEventsTestTask { + /** */ + private final AtomicBoolean failed = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, + List<ComputeJobResult> rcvd) throws IgniteException { + if (failed.compareAndSet(false, true)) + return FAILOVER; + + return super.result(res, rcvd); + } + } }