abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2006
Change subject: [NO ISSUE][TEST] Improve Job Failure Tests ...................................................................... [NO ISSUE][TEST] Improve Job Failure Tests - user model changes: no - storage format changes: no - interface changes: no details: - Add a check that all started jobs finished in multi nc tests. - Job Cancellation is only completed when the job is final completed. Change-Id: I9cdf53a88e07aaa3dc7cd11c5bb7ef9369835da6 Change-Id: I53282b04148fd0c24f55d5b87db49d705530b99b --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java A hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java 6 files changed, 148 insertions(+), 4 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/06/2006/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index d36d9b7..cbcc44f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -132,6 +132,7 @@ @Override public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { + LOGGER.log(level, "Getting notified of job finish for JobId: " + jobId); EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index b710f2b..c01bcbc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -18,7 +18,6 @@ */ package org.apache.asterix.app.active; -import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -90,8 +89,8 @@ ((QueryTranslator) statementExecutor).getSessionOutput(), mdProvider, feed, feedConnections, compilationProvider, storageComponentProvider, statementExecutorFactory, hcc); JobSpecification feedJob = jobInfo.getLeft(); - WaitForStateSubscriber eventSubscriber = - new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING)); + WaitForStateSubscriber eventSubscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, + ActivityState.TEMPORARILY_FAILED, ActivityState.PERMANENTLY_FAILED)); feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. // We will need to design general exception handling mechanism for feeds. diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java index e03ee6e..b1191ec 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java @@ -165,6 +165,26 @@ } @Test + public void testStartWhenStartFailsCompile() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.FAIL_COMPILE); + Action action = users[0].startActivity(listener); + action.sync(); + assertFailure(action, 0); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + } + + @Test + public void testStartWhenStartFailsRuntime() throws Exception { + Assert.assertEquals(ActivityState.STOPPED, listener.getState()); + listener.onStart(Behavior.FAIL_RUNTIME); + Action action = users[0].startActivity(listener); + action.sync(); + assertFailure(action, 0); + Assert.assertEquals(ActivityState.PERMANENTLY_FAILED, listener.getState()); + } + + @Test public void testStartWhenOneNodeFinishesBeforeOtherNodeStarts() throws Exception { Assert.assertEquals(ActivityState.STOPPED, listener.getState()); listener.onStart(Behavior.SUCCEED); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index bb85c13..b57d798 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -39,6 +39,7 @@ private JobStatus status; private List<Exception> exceptions; private IResultCallback<Void> callback; + private final StackTraceElement[] creationTrace; public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, IResultCallback<Void> callback) { @@ -47,6 +48,7 @@ this.status = status; this.exceptions = exceptions; this.callback = callback; + creationTrace = Thread.currentThread().getStackTrace(); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 21c1e77..cc46a7d 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -43,6 +43,7 @@ import org.apache.hyracks.client.dataset.HyracksDataset; import org.apache.hyracks.control.cc.BaseCCApplication; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.application.CCServiceContext; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; @@ -58,6 +59,7 @@ public abstract class AbstractMultiNCIntegrationTest { private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName()); + private static final TestJobLifecycleListener jobLifecycleListener = new TestJobLifecycleListener(); public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" }; @@ -92,7 +94,8 @@ ccConfig.setAppClass(DummyApplication.class.getName()); cc = new ClusterControllerService(ccConfig); cc.start(); - + CCServiceContext serviceCtx = cc.getContext(); + serviceCtx.addJobLifecycleListener(jobLifecycleListener); asterixNCs = new NodeControllerService[ASTERIX_IDS.length]; for (int i = 0; i < ASTERIX_IDS.length; i++) { File ioDev = new File("target" + File.separator + ASTERIX_IDS[i] + File.separator + "ioDevice"); @@ -121,6 +124,7 @@ nc.stop(); } cc.stop(); + jobLifecycleListener.check(); } protected JobId startJob(JobSpecification spec) throws Exception { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java new file mode 100644 index 0000000..c8d0b9c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.integration; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.IJobLifecycleListener; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; + +public class TestJobLifecycleListener implements IJobLifecycleListener { + + private static final Logger LOGGER = Logger.getLogger(TestJobLifecycleListener.class.getName()); + private final Map<JobId, JobSpecification> created = new HashMap<>(); + private final Set<JobId> started = new HashSet<>(); + private final Set<JobId> finished = new HashSet<>(); + private final Map<JobId, Integer> doubleCreated = new HashMap<>(); + private final Map<JobId, Integer> doubleStarted = new HashMap<>(); + private final Map<JobId, Integer> doubleFinished = new HashMap<>(); + private final Set<JobId> startWithoutCreate = new HashSet<>(); + private final Set<JobId> finishWithoutStart = new HashSet<>(); + + @Override + public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { + if (created.containsKey(jobId)) { + LOGGER.log(Level.WARNING, "Job " + jobId + "has been created before"); + increment(doubleCreated, jobId); + } + created.put(jobId, spec); + } + + private void increment(Map<JobId, Integer> map, JobId jobId) { + Integer count = map.get(jobId); + count = count == null ? 2 : count + 1; + map.put(jobId, count); + } + + @Override + public void notifyJobStart(JobId jobId) throws HyracksException { + if (!created.containsKey(jobId)) { + LOGGER.log(Level.WARNING, "Job " + jobId + "has not been created"); + startWithoutCreate.add(jobId); + } + if (started.contains(jobId)) { + LOGGER.log(Level.WARNING, "Job " + jobId + "has been started before"); + increment(doubleStarted, jobId); + } + started.add(jobId); + } + + @Override + public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { + if (!started.contains(jobId)) { + LOGGER.log(Level.WARNING, "Job " + jobId + "has not been started"); + finishWithoutStart.add(jobId); + } + if (finished.contains(jobId)) { + // TODO: job finish should be called once only when it has really completed + // throw new HyracksDataException("Job " + jobId + "has been finished before"); + LOGGER.log(Level.WARNING, "Dangerous: Duplicate Job: " + jobId + " has finished with status: " + jobStatus); + increment(doubleFinished, jobId); + } + finished.add(jobId); + } + + public void check() throws Exception { + LOGGER.log(Level.WARNING, "Checking all created jobs have started"); + for (JobId jobId : created.keySet()) { + if (!started.contains(jobId)) { + LOGGER.log(Level.WARNING, "JobId " + jobId + " has been created but never started"); + } + } + LOGGER.log(Level.WARNING, "Checking all started jobs have terminated"); + for (JobId jobId : started) { + if (!finished.contains(jobId)) { + LOGGER.log(Level.WARNING, "JobId " + jobId + " has started but not finished"); + } + } + LOGGER.log(Level.WARNING, "Checking multiple creates"); + for (Entry<JobId, Integer> entry : doubleCreated.entrySet()) { + LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been created " + entry.getValue() + " times"); + } + LOGGER.log(Level.WARNING, "Checking multiple starts"); + for (Entry<JobId, Integer> entry : doubleStarted.entrySet()) { + LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been started " + entry.getValue() + " times"); + } + LOGGER.log(Level.WARNING, "Checking multiple finishes"); + for (Entry<JobId, Integer> entry : doubleFinished.entrySet()) { + LOGGER.log(Level.WARNING, "job " + entry.getKey() + " has been finished " + entry.getValue() + " times"); + } + LOGGER.log(Level.WARNING, "Done checking!"); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/2006 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I53282b04148fd0c24f55d5b87db49d705530b99b Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>