abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1900

Change subject: [ASTERIXDB-2002][HYR] Report failures during task start
......................................................................

[ASTERIXDB-2002][HYR] Report failures during task start

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- failures that happen before creating the task object were
  never reported because the task object was null and they
  simply throw null pointer exception.

Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
M 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
8 files changed, 71 insertions(+), 37 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/00/1900/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index def4c83..83ab532 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -61,7 +61,8 @@
     private int inputArity = 0;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, 
Map<ActivityId, IActivity> startActivities,
-            IHyracksTaskContext ctx, IRecordDescriptorProvider 
recordDescProvider, int partition, int nPartitions) {
+            IHyracksTaskContext ctx, IRecordDescriptorProvider 
recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
         this.parent = parent;
         this.startActivities = startActivities;
         this.ctx = ctx;
@@ -76,7 +77,7 @@
         try {
             init();
         } catch (Exception e) {
-            throw new IllegalStateException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 74a628d..bff2794 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -275,7 +275,8 @@
         if (!addPendingThread(ct)) {
             exceptions.add(HyracksDataException.create(TASK_ABORTED, 
getTaskAttemptId()));
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
-            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, 
exceptions));
+            ncs.getWorkQueue()
+                    .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, 
joblet.getJobId(), taskAttemptId));
             return;
         }
         ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
@@ -353,13 +354,14 @@
                 for (int i = 0; i < exceptions.size(); i++) {
                     LOGGER.log(Level.WARNING,
                             "Task " + taskAttemptId + " failed with exception"
-                                    + (exceptions.size() > 1 ? "s (" + (i + 1) 
+ "/" + exceptions.size()  + ")" : ""),
+                                    + (exceptions.size() > 1 ? "s (" + (i + 1) 
+ "/" + exceptions.size() + ")" : ""),
                             exceptions.get(i));
                 }
             }
             NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
-            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, 
exceptions));
+            ncs.getWorkQueue()
+                    .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, 
joblet.getJobId(), taskAttemptId));
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index e81fa5a..fa8ba28 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -19,7 +19,10 @@
 package org.apache.hyracks.control.nc.work;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.work.AbstractWork;
@@ -27,30 +30,36 @@
 import org.apache.hyracks.control.nc.Task;
 
 public class NotifyTaskFailureWork extends AbstractWork {
+    private static final Logger LOGGER = 
Logger.getLogger(NotifyTaskFailureWork.class.getName());
     private final NodeControllerService ncs;
     private final Task task;
+    private final JobId jobId;
+    private final TaskAttemptId taskId;
 
     private final List<Exception> exceptions;
 
-    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, 
List<Exception> exceptions) {
+    public NotifyTaskFailureWork(NodeControllerService ncs, Task task, 
List<Exception> exceptions, JobId jobId,
+            TaskAttemptId taskId) {
         this.ncs = ncs;
         this.task = task;
         this.exceptions = exceptions;
+        this.jobId = jobId;
+        this.taskId = taskId;
     }
 
     @Override
     public void run() {
         try {
-            JobId jobId = task.getJobletContext().getJobId();
             IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
             if (dpm != null) {
                 dpm.abortReader(jobId);
             }
-            ncs.getClusterController().notifyTaskFailure(jobId, 
task.getTaskAttemptId(), ncs.getId(), exceptions);
-            //exceptions.get(0).printStackTrace();
+            ncs.getClusterController().notifyTaskFailure(jobId, taskId, 
ncs.getId(), exceptions);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Failure reporting task failure to 
cluster controller", e);
         }
-        task.getJoblet().removeTask(task);
+        if (task != null) {
+            task.getJoblet().removeTask(task);
+        }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index b55cd4b..c369781 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -102,6 +102,7 @@
     @Override
     public void run() {
         Task task = null;
+        int taskIndex = 0;
         try {
             ncs.updateMaxJobId(jobId);
             NCServiceContext serviceCtx = ncs.getContext();
@@ -122,7 +123,8 @@
                     return 
ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
             };
-            for (TaskAttemptDescriptor td : taskDescriptors) {
+            while (taskIndex < taskDescriptors.size()) {
+                TaskAttemptDescriptor td = taskDescriptors.get(taskIndex);
                 TaskAttemptId taId = td.getTaskAttemptId();
                 TaskId tid = taId.getTaskId();
                 ActivityId aid = tid.getActivityId();
@@ -133,6 +135,7 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = 
ac.getActivityInputMap().get(aid);
+                task = null;
                 task = new Task(joblet, flags, taId, han.getClass().getName(), 
ncs.getExecutor(), ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, 
rdp, partition, td.getPartitionCount());
@@ -174,13 +177,16 @@
                 task.setTaskRuntime(collectors.toArray(new 
IPartitionCollector[collectors.size()]), operator);
                 joblet.addTask(task);
                 task.start();
+                taskIndex++;
             }
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Failure starting a task", e);
             // notify cc of start task failure
             List<Exception> exceptions = new ArrayList<>();
+            exceptions.add(e);
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
-            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, 
exceptions));
+            TaskAttemptId taskId = 
taskDescriptors.get(taskIndex).getTaskAttemptId();
+            ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, 
exceptions, jobId, taskId));
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 05a7e2d..18479e2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -59,8 +59,8 @@
 
     private static final Logger LOGGER = 
Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
 
-    public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", 
"asterix-003", "asterix-004",
-            "asterix-005", "asterix-006", "asterix-007" };
+    public static final String[] ASTERIX_IDS =
+            { "asterix-001", "asterix-002", "asterix-003", "asterix-004", 
"asterix-005", "asterix-006", "asterix-007" };
 
     private static ClusterControllerService cc;
 
@@ -103,7 +103,7 @@
             ncConfig.setClusterListenAddress("127.0.0.1");
             ncConfig.setDataListenAddress("127.0.0.1");
             ncConfig.setResultListenAddress("127.0.0.1");
-            ncConfig.setIODevices(new String [] { ioDev.getAbsolutePath() });
+            ncConfig.setIODevices(new String[] { ioDev.getAbsolutePath() });
             asterixNCs[i] = new NodeControllerService(ncConfig);
             asterixNCs[i].start();
         }
@@ -138,7 +138,7 @@
         hcc.cancelJob(jobId);
     }
 
-    protected void runTest(JobSpecification spec) throws Exception {
+    protected void runTest(JobSpecification spec, String expectedErrorMessage) 
throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(spec.toJSON().asText());
         }
@@ -180,14 +180,29 @@
                     try {
                         bbis.close();
                     } catch (IOException e) {
-                        throw new HyracksDataException(e);
+                        throw HyracksDataException.create(e);
                     }
                 }
-
                 readSize = reader.read(resultFrame);
             }
         }
-        hcc.waitForCompletion(jobId);
+        boolean expectedExceptionThrown = false;
+        try {
+            hcc.waitForCompletion(jobId);
+        } catch (HyracksDataException hde) {
+            if (expectedErrorMessage != null) {
+                if (hde.toString().contains(expectedErrorMessage)) {
+                    expectedExceptionThrown = true;
+                } else {
+                    throw hde;
+                }
+            } else {
+                throw hde;
+            }
+        }
+        if (expectedErrorMessage != null && !expectedExceptionThrown) {
+            throw new Exception("Expected error (" + expectedErrorMessage + ") 
was not thrown");
+        }
         dumpOutputFiles();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 871109a..13a103e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -48,7 +48,7 @@
         spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
         spec.addRoot(sinkOpDesc);
         try {
-            runTest(spec);
+            runTest(spec, 
ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE);
         } catch (Exception e) {
             e.printStackTrace();
             throw e;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index e92113a..67642f4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -69,15 +69,15 @@
 
 public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest 
{
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0], "data" + 
File.separator + "tpch0.001"
-                    + File.separator + "lineitem.tbl"),
-                    new ManagedFileSplit(ASTERIX_IDS[1], "data" + 
File.separator + "tpch0.001" + File.separator
-                            + "lineitem.tbl"),
-                    new ManagedFileSplit(ASTERIX_IDS[2], "data" + 
File.separator + "tpch0.001" + File.separator
-                            + "lineitem.tbl"),
-                    new ManagedFileSplit(ASTERIX_IDS[3], "data" + 
File.separator + "tpch0.001" + File.separator
-                            + "lineitem.tbl") });
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new 
FileSplit[] {
+            new ManagedFileSplit(ASTERIX_IDS[0],
+                    "data" + File.separator + "tpch0.001" + File.separator + 
"lineitem.tbl"),
+            new ManagedFileSplit(ASTERIX_IDS[1],
+                    "data" + File.separator + "tpch0.001" + File.separator + 
"lineitem.tbl"),
+            new ManagedFileSplit(ASTERIX_IDS[2],
+                    "data" + File.separator + "tpch0.001" + File.separator + 
"lineitem.tbl"),
+            new ManagedFileSplit(ASTERIX_IDS[3],
+                    "data" + File.separator + "tpch0.001" + File.separator + 
"lineitem.tbl") });
 
     final int fileSize = 800 * 1024 * 4;
 
@@ -112,8 +112,8 @@
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new 
FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner =
+                new FileScanOperatorDescriptor(spec, splitProvider, 
tupleParserFactory, desc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
csvScanner, "asterix-001", "asterix-002",
                 "asterix-003", "asterix-004");
@@ -163,7 +163,7 @@
         spec.connect(conn2, grouper, 0, printer, 0);
 
         spec.addRoot(printer);
-        runTest(spec);
+        runTest(spec, null);
     }
 
     /**
@@ -177,8 +177,8 @@
 
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new 
FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner =
+                new FileScanOperatorDescriptor(spec, splitProvider, 
tupleParserFactory, desc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
csvScanner, "asterix-001", "asterix-002",
                 "asterix-003", "asterix-004");
@@ -221,7 +221,7 @@
         spec.connect(conn2, grouper, 0, printer, 0);
 
         spec.addRoot(printer);
-        runTest(spec);
+        runTest(spec, null);
     }
 
     private AbstractSingleActivityOperatorDescriptor 
getPrinter(JobSpecification spec, String prefix)
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
index f814cd5..d704671 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -38,6 +38,7 @@
     private static AtomicInteger createPushRuntime = new AtomicInteger();
     private static AtomicInteger initializeCounter = new AtomicInteger();
     private static AtomicInteger openCloseCounter = new AtomicInteger();
+    public static final String ERROR_MESSAGE = "I throw exceptions";
     private final int[] exceptionPartitions;
     private final boolean sleepOnInitialize;
 
@@ -56,7 +57,7 @@
             if (exceptionPartitions != null) {
                 for (int p : exceptionPartitions) {
                     if (p == partition) {
-                        throw new HyracksDataException("I throw exceptions");
+                        throw new HyracksDataException(ERROR_MESSAGE);
                     }
                 }
             }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1900
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to