IGNITE-8813 Added ComputeJobResultPolicy to JobEvent - Fixes #4213.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e5af5993
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e5af5993
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e5af5993

Branch: refs/heads/ignite-8446
Commit: e5af59937726362bc5253d57fdf664c626634574
Parents: e20cf5a
Author: dkarachentsev <dkarachent...@gridgain.com>
Authored: Fri Jun 22 15:30:44 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Jun 22 15:30:44 2018 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/events/JobEvent.java | 73 ++++++++++------
 .../processors/task/GridTaskWorker.java         | 11 ++-
 .../GridEventStorageCheckAllEventsSelfTest.java | 90 +++++++++++++++++++-
 3 files changed, 145 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e5af5993/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java 
b/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java
index 13a53a7..b279ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/JobEvent.java
@@ -19,41 +19,44 @@ package org.apache.ignite.events;
 
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Grid job event.
  * <p>
- * Grid events are used for notification about what happens within the grid. 
Note that by
- * design Ignite keeps all events generated on the local node locally and it 
provides
- * APIs for performing a distributed queries across multiple nodes:
+ * Grid events are used for notification about what happens within the grid. 
Note that by design Ignite keeps all events
+ * generated on the local node locally and it provides APIs for performing a 
distributed queries across multiple nodes:
  * <ul>
- *      <li>
- *          {@link 
org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate,
 long, int...)} -
- *          asynchronously querying events occurred on the nodes specified, 
including remote nodes.
- *      </li>
- *      <li>
- *          {@link 
org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate,
 int...)} -
- *          querying only local events stored on this local node.
- *      </li>
- *      <li>
- *          {@link 
org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate,
 int...)} -
- *          listening to local grid events (events from remote nodes not 
included).
- *      </li>
+ * <li>
+ * {@link 
org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate,
 long, int...)} -
+ * asynchronously querying events occurred on the nodes specified, including 
remote nodes.
+ * </li>
+ * <li>
+ * {@link 
org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate,
 int...)} - querying only
+ * local events stored on this local node.
+ * </li>
+ * <li>
+ * {@link 
org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate,
 int...)} - listening to
+ * local grid events (events from remote nodes not included).
+ * </li>
  * </ul>
- * User can also wait for events using method {@link 
org.apache.ignite.IgniteEvents#waitForLocal(org.apache.ignite.lang.IgnitePredicate,
 int...)}.
+ * User can also wait for events using method {@link 
org.apache.ignite.IgniteEvents#waitForLocal(org.apache.ignite.lang.IgnitePredicate,
+ * int...)}.
  * <h1 class="header">Events and Performance</h1>
- * Note that by default all events in Ignite are enabled and therefore 
generated and stored
- * by whatever event storage SPI is configured. Ignite can and often does 
generate thousands events per seconds
- * under the load and therefore it creates a significant additional load on 
the system. If these events are
- * not needed by the application this load is unnecessary and leads to 
significant performance degradation.
+ * Note that by default all events in Ignite are enabled and therefore 
generated and stored by whatever event storage
+ * SPI is configured. Ignite can and often does generate thousands events per 
seconds under the load and therefore it
+ * creates a significant additional load on the system. If these events are 
not needed by the application this load is
+ * unnecessary and leads to significant performance degradation.
  * <p>
- * It is <b>highly recommended</b> to enable only those events that your 
application logic requires
- * by using {@link 
org.apache.ignite.configuration.IgniteConfiguration#getIncludeEventTypes()} 
method in Ignite configuration. Note that certain
- * events are required for Ignite's internal operations and such events will 
still be generated but not stored by
- * event storage SPI if they are disabled in Ignite configuration.
+ * It is <b>highly recommended</b> to enable only those events that your 
application logic requires by using {@link
+ * org.apache.ignite.configuration.IgniteConfiguration#getIncludeEventTypes()} 
method in Ignite configuration. Note that
+ * certain events are required for Ignite's internal operations and such 
events will still be generated but not stored
+ * by event storage SPI if they are disabled in Ignite configuration.
+ *
  * @see EventType#EVT_JOB_CANCELLED
  * @see EventType#EVT_JOB_FAILED
  * @see EventType#EVT_JOB_FAILED_OVER
@@ -88,6 +91,9 @@ public class JobEvent extends EventAdapter {
     /** */
     private UUID taskSubjId;
 
+    /** */
+    private ComputeJobResultPolicy resPlc;
+
     /** {@inheritDoc} */
     @Override public String shortDisplay() {
         return name() + ": taskName=" + taskName;
@@ -225,6 +231,25 @@ public class JobEvent extends EventAdapter {
         this.taskSubjId = taskSubjId;
     }
 
+    /**
+     * Gets job result policy. Not null for {@link EventType#EVT_JOB_RESULTED}
+     * and {@link EventType#EVT_JOB_FAILED_OVER} event types.
+     *
+     * @return Result policy.
+     */
+    @Nullable public ComputeJobResultPolicy resultPolicy() {
+        return resPlc;
+    }
+
+    /**
+     * Sets job result policy.
+     *
+     * @param resPlc New result policy.
+     */
+    public void resultPolicy(@Nullable ComputeJobResultPolicy resPlc) {
+        this.resPlc = resPlc;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(JobEvent.class, this,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5af5993/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 5693eed..78efd2d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -599,7 +599,7 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
             if (resCache)
                 sibs.add(sib);
 
-            recordJobEvent(EVT_JOB_MAPPED, jobId, node, "Job got mapped.");
+            recordJobEvent(EVT_JOB_MAPPED, jobId, node, null, "Job got 
mapped.");
         }
 
         synchronized (mux) {
@@ -1057,7 +1057,7 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
                     }
                     finally {
                         recordJobEvent(EVT_JOB_RESULTED, 
jobRes.getJobContext().getJobId(),
-                            jobRes.getNode(), "Job got resulted with: " + plc);
+                            jobRes.getNode(), plc, "Job got resulted with: " + 
plc);
                     }
 
                     if (log.isDebugEnabled())
@@ -1262,7 +1262,7 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
 
         if (timeout > 0) {
             recordJobEvent(EVT_JOB_FAILED_OVER, 
jobRes.getJobContext().getJobId(),
-                jobRes.getNode(), "Job failed over.");
+                jobRes.getNode(), FAILOVER, "Job failed over.");
 
             // Send new reference to remote nodes for execution.
             sendRequest(jobRes);
@@ -1542,9 +1542,11 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
      * @param evtType Event type.
      * @param jobId Job ID.
      * @param evtNode Event node.
+     * @param plc Job result policy.
      * @param msg Event message.
      */
-    private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode 
evtNode, String msg) {
+    private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode 
evtNode,
+        @Nullable ComputeJobResultPolicy plc, String msg) {
         if (!internal && ctx.event().isRecordable(evtType)) {
             JobEvent evt = new JobEvent();
 
@@ -1557,6 +1559,7 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
             evt.jobId(jobId);
             evt.type(evtType);
             evt.taskSubjectId(ses.subjectId());
+            evt.resultPolicy(plc);
 
             ctx.event().record(evt);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e5af5993/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java
index 30a16b0..9186902 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridEventStorageCheckAllEventsSelfTest.java
@@ -22,12 +22,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.compute.ComputeTaskMapAsync;
 import org.apache.ignite.compute.ComputeTaskSession;
@@ -52,11 +54,14 @@ import 
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
+import static org.apache.ignite.compute.ComputeJobResultPolicy.WAIT;
 import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_REMOVED;
 import static org.apache.ignite.events.EventType.EVT_CHECKPOINT_SAVED;
 import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED;
 import static org.apache.ignite.events.EventType.EVT_JOB_FAILED;
+import static org.apache.ignite.events.EventType.EVT_JOB_FAILED_OVER;
 import static org.apache.ignite.events.EventType.EVT_JOB_FINISHED;
 import static org.apache.ignite.events.EventType.EVT_JOB_MAPPED;
 import static org.apache.ignite.events.EventType.EVT_JOB_QUEUED;
@@ -145,6 +150,9 @@ public class GridEventStorageCheckAllEventsSelfTest extends 
GridCommonAbstractTe
         assertEvent(evts.get(8).type(), EVT_TASK_REDUCED, evts);
         assertEvent(evts.get(9).type(), EVT_TASK_FINISHED, evts);
         assertEvent(evts.get(10).type(), EVT_JOB_FINISHED, evts);
+
+        assertNotNull(((JobEvent)evts.get(7)).resultPolicy());
+        assertEquals(WAIT, ((JobEvent)evts.get(7)).resultPolicy());
     }
 
     /**
@@ -185,6 +193,9 @@ public class GridEventStorageCheckAllEventsSelfTest extends 
GridCommonAbstractTe
         assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts);
         assertEvent(evts.get(10).type(), EVT_TASK_UNDEPLOYED, evts);
         assertEvent(evts.get(11).type(), EVT_TASK_DEPLOYED, evts);
+
+        assertNotNull(((JobEvent)evts.get(6)).resultPolicy());
+        assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy());
     }
 
     /**
@@ -209,11 +220,55 @@ public class GridEventStorageCheckAllEventsSelfTest 
extends GridCommonAbstractTe
         assertEvent(evts.get(7).type(), EVT_TASK_REDUCED, evts);
         assertEvent(evts.get(8).type(), EVT_TASK_FINISHED, evts);
         assertEvent(evts.get(9).type(), EVT_JOB_FINISHED, evts);
+
+        assertNotNull(((JobEvent)evts.get(6)).resultPolicy());
+        assertEquals(WAIT, ((JobEvent)evts.get(6)).resultPolicy());
     }
 
     /**
      * @throws Exception If test failed.
      */
+    public void testFailoverJobTask() throws Exception {
+        startGrid(0);
+
+        try {
+            generateEvents(null, new GridAllEventsSuccessTestJob()).get();
+
+            ignite.compute().execute(GridFailoverTestTask.class.getName(), new 
GridAllEventsSuccessTestJob());
+
+            long tstamp = startTimestamp();
+
+            ignite.compute().execute(GridFailoverTestTask.class.getName(), new 
GridAllEventsSuccessTestJob());
+
+            List<Event> evts = pullEvents(tstamp, 12, 
GridFailoverTestTask.class.getName());
+
+            int cnt = 0;
+
+            assertEvent(evts.get(cnt++).type(), EVT_TASK_STARTED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_MAPPED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts);
+
+            assertEquals(((JobEvent)evts.get(cnt - 1)).resultPolicy(), 
FAILOVER);
+
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_FAILED_OVER, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_QUEUED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_STARTED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_SAVED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_CHECKPOINT_REMOVED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_RESULTED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_TASK_REDUCED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_TASK_FINISHED, evts);
+            assertEvent(evts.get(cnt++).type(), EVT_JOB_FINISHED, evts);
+        }
+        finally {
+            stopGrid(0);
+        }
+    }
+
+
+    /**
+     * @throws Exception If test failed.
+     */
     public void testFailTask() throws Exception {
         long tstamp = startTimestamp();
 
@@ -237,6 +292,9 @@ public class GridEventStorageCheckAllEventsSelfTest extends 
GridCommonAbstractTe
         assertEvent(evts.get(4).type(), EVT_JOB_RESULTED, evts);
         assertEvent(evts.get(5).type(), EVT_TASK_FAILED, evts);
         assertEvent(evts.get(6).type(), EVT_JOB_FAILED, evts);
+
+        // Exception was thrown, so policy is null.
+        assertNull(((JobEvent)evts.get(4)).resultPolicy());
     }
 
     /**
@@ -326,7 +384,20 @@ public class GridEventStorageCheckAllEventsSelfTest 
extends GridCommonAbstractTe
      * @throws Exception If failed.
      */
     private List<Event> pullEvents(long since, int evtCnt) throws Exception {
-        IgnitePredicate<Event> filter = new 
CustomEventFilter(GridAllEventsTestTask.class.getName(), since);
+        return pullEvents(since, evtCnt, 
GridAllEventsTestTask.class.getName());
+    }
+
+    /**
+     * Pull all test task related events since the given moment.
+     *
+     * @param since Earliest time to pulled events.
+     * @param evtCnt Expected event count.
+     * @param taskName Name of the task.
+     * @return List of events.
+     * @throws Exception If failed.
+     */
+    private List<Event> pullEvents(long since, int evtCnt, String taskName) 
throws Exception {
+        IgnitePredicate<Event> filter = new CustomEventFilter(taskName, since);
 
         for (int i = 0; i < 3; i++) {
             List<Event> evts = new 
ArrayList<>(ignite.events().localQuery((filter)));
@@ -497,4 +568,21 @@ public class GridEventStorageCheckAllEventsSelfTest 
extends GridCommonAbstractTe
             return (Serializable)results;
         }
     }
+
+    /**
+     *
+     */
+    private static class GridFailoverTestTask extends GridAllEventsTestTask {
+        /** */
+        private final AtomicBoolean failed = new AtomicBoolean();
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res,
+            List<ComputeJobResult> rcvd) throws IgniteException {
+            if (failed.compareAndSet(false, true))
+                return FAILOVER;
+
+            return super.result(res, rcvd);
+        }
+    }
 }

Reply via email to