abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1900
Change subject: [ASTERIXDB-2002][HYR] Report failures during task start ...................................................................... [ASTERIXDB-2002][HYR] Report failures during task start - user model changes: no - storage format changes: no - interface changes: no details: - failures that happen before creating the task object were never reported because the task object was null and they simply throw null pointer exception. Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723 --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java 8 files changed, 71 insertions(+), 37 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/00/1900/1 diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index def4c83..83ab532 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -61,7 +61,8 @@ private int inputArity = 0; public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities, - IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { + IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { this.parent = parent; this.startActivities = startActivities; this.ctx = ctx; @@ -76,7 +77,7 @@ try { init(); } catch (Exception e) { - throw new IllegalStateException(e); + throw HyracksDataException.create(e); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 74a628d..bff2794 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -275,7 +275,8 @@ if (!addPendingThread(ct)) { exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId())); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); - ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions)); + ncs.getWorkQueue() + .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId)); return; } ct.setName(displayName + ":" + taskAttemptId + ":" + 0); @@ -353,13 +354,14 @@ for (int i = 0; i < exceptions.size(); i++) { LOGGER.log(Level.WARNING, "Task " + taskAttemptId + " failed with exception" - + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""), + + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""), exceptions.get(i)); } } NodeControllerService ncs = joblet.getNodeController(); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); - ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions)); + ncs.getWorkQueue() + .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index e81fa5a..fa8ba28 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -19,7 +19,10 @@ package org.apache.hyracks.control.nc.work; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.work.AbstractWork; @@ -27,30 +30,36 @@ import org.apache.hyracks.control.nc.Task; public class NotifyTaskFailureWork extends AbstractWork { + private static final Logger LOGGER = Logger.getLogger(NotifyTaskFailureWork.class.getName()); private final NodeControllerService ncs; private final Task task; + private final JobId jobId; + private final TaskAttemptId taskId; private final List<Exception> exceptions; - public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) { + public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions, JobId jobId, + TaskAttemptId taskId) { this.ncs = ncs; this.task = task; this.exceptions = exceptions; + this.jobId = jobId; + this.taskId = taskId; } @Override public void run() { try { - JobId jobId = task.getJobletContext().getJobId(); IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); if (dpm != null) { dpm.abortReader(jobId); } - ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), exceptions); - //exceptions.get(0).printStackTrace(); + ncs.getClusterController().notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions); } catch (Exception e) { - e.printStackTrace(); + LOGGER.log(Level.SEVERE, "Failure reporting task failure to cluster controller", e); } - task.getJoblet().removeTask(task); + if (task != null) { + task.getJoblet().removeTask(task); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index b55cd4b..c369781 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -102,6 +102,7 @@ @Override public void run() { Task task = null; + int taskIndex = 0; try { ncs.updateMaxJobId(jobId); NCServiceContext serviceCtx = ncs.getContext(); @@ -122,7 +123,8 @@ return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); } }; - for (TaskAttemptDescriptor td : taskDescriptors) { + while (taskIndex < taskDescriptors.size()) { + TaskAttemptDescriptor td = taskDescriptors.get(taskIndex); TaskAttemptId taId = td.getTaskAttemptId(); TaskId tid = taId.getTaskId(); ActivityId aid = tid.getActivityId(); @@ -133,6 +135,7 @@ } final int partition = tid.getPartition(); List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid); + task = null; task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs, createInputChannels(td, inputs)); IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount()); @@ -174,13 +177,16 @@ task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator); joblet.addTask(task); task.start(); + taskIndex++; } } catch (Exception e) { LOGGER.log(Level.WARNING, "Failure starting a task", e); // notify cc of start task failure List<Exception> exceptions = new ArrayList<>(); + exceptions.add(e); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); - ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions)); + TaskAttemptId taskId = taskDescriptors.get(taskIndex).getTaskAttemptId(); + ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions, jobId, taskId)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 05a7e2d..18479e2 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -59,8 +59,8 @@ private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName()); - public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", "asterix-003", "asterix-004", - "asterix-005", "asterix-006", "asterix-007" }; + public static final String[] ASTERIX_IDS = + { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" }; private static ClusterControllerService cc; @@ -103,7 +103,7 @@ ncConfig.setClusterListenAddress("127.0.0.1"); ncConfig.setDataListenAddress("127.0.0.1"); ncConfig.setResultListenAddress("127.0.0.1"); - ncConfig.setIODevices(new String [] { ioDev.getAbsolutePath() }); + ncConfig.setIODevices(new String[] { ioDev.getAbsolutePath() }); asterixNCs[i] = new NodeControllerService(ncConfig); asterixNCs[i].start(); } @@ -138,7 +138,7 @@ hcc.cancelJob(jobId); } - protected void runTest(JobSpecification spec) throws Exception { + protected void runTest(JobSpecification spec, String expectedErrorMessage) throws Exception { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info(spec.toJSON().asText()); } @@ -180,14 +180,29 @@ try { bbis.close(); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } - readSize = reader.read(resultFrame); } } - hcc.waitForCompletion(jobId); + boolean expectedExceptionThrown = false; + try { + hcc.waitForCompletion(jobId); + } catch (HyracksDataException hde) { + if (expectedErrorMessage != null) { + if (hde.toString().contains(expectedErrorMessage)) { + expectedExceptionThrown = true; + } else { + throw hde; + } + } else { + throw hde; + } + } + if (expectedErrorMessage != null && !expectedExceptionThrown) { + throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown"); + } dumpOutputFiles(); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java index 871109a..13a103e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java @@ -48,7 +48,7 @@ spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0); spec.addRoot(sinkOpDesc); try { - runTest(spec); + runTest(spec, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE); } catch (Exception e) { e.printStackTrace(); throw e; diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java index e92113a..67642f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java @@ -69,15 +69,15 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest { - final IFileSplitProvider splitProvider = new ConstantFileSplitProvider( - new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0], "data" + File.separator + "tpch0.001" - + File.separator + "lineitem.tbl"), - new ManagedFileSplit(ASTERIX_IDS[1], "data" + File.separator + "tpch0.001" + File.separator - + "lineitem.tbl"), - new ManagedFileSplit(ASTERIX_IDS[2], "data" + File.separator + "tpch0.001" + File.separator - + "lineitem.tbl"), - new ManagedFileSplit(ASTERIX_IDS[3], "data" + File.separator + "tpch0.001" + File.separator - + "lineitem.tbl") }); + final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { + new ManagedFileSplit(ASTERIX_IDS[0], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"), + new ManagedFileSplit(ASTERIX_IDS[1], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"), + new ManagedFileSplit(ASTERIX_IDS[2], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"), + new ManagedFileSplit(ASTERIX_IDS[3], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl") }); final int fileSize = 800 * 1024 * 4; @@ -112,8 +112,8 @@ JobSpecification spec = new JobSpecification(); - FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, - desc); + FileScanOperatorDescriptor csvScanner = + new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004"); @@ -163,7 +163,7 @@ spec.connect(conn2, grouper, 0, printer, 0); spec.addRoot(printer); - runTest(spec); + runTest(spec, null); } /** @@ -177,8 +177,8 @@ JobSpecification spec = new JobSpecification(); - FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, - desc); + FileScanOperatorDescriptor csvScanner = + new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004"); @@ -221,7 +221,7 @@ spec.connect(conn2, grouper, 0, printer, 0); spec.addRoot(printer); - runTest(spec); + runTest(spec, null); } private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix) diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java index f814cd5..d704671 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java @@ -38,6 +38,7 @@ private static AtomicInteger createPushRuntime = new AtomicInteger(); private static AtomicInteger initializeCounter = new AtomicInteger(); private static AtomicInteger openCloseCounter = new AtomicInteger(); + public static final String ERROR_MESSAGE = "I throw exceptions"; private final int[] exceptionPartitions; private final boolean sleepOnInitialize; @@ -56,7 +57,7 @@ if (exceptionPartitions != null) { for (int p : exceptionPartitions) { if (p == partition) { - throw new HyracksDataException("I throw exceptions"); + throw new HyracksDataException(ERROR_MESSAGE); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1900 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>