abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1999
Change subject: [NO ISSUE][HYR] Fix wait for completion work ...................................................................... [NO ISSUE][HYR] Fix wait for completion work - user model changes: no - storage format changes: no - interface changes: no details: - When wait for completion is called on a job that was not created yet, an exception is returned. - When wait for completion is called on a job that has been cleared from job archive, it is retrieved correctly from history. - When wait for completion is called on a job that has been cleared from job history, an exception is returned. - Test cases that fail before the fix have been added. Change-Id: I9e50f6ce1df9f27517d7ec3a3f8a5d38246f71ff --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java 10 files changed, 112 insertions(+), 71 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/99/1999/1 diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index ff98efa..cf83bca 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -125,6 +125,8 @@ public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89; public static final int ILLEGAL_MEMORY_BUDGET = 90; public static final int TIMEOUT = 91; + public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92; + public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java index dd63786..eea6b52 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java @@ -27,6 +27,10 @@ return new JobId(id.getAndIncrement()); } + public long maxJobId() { + return id.get(); + } + public void ensureMinimumId(long id) { this.id.updateAndGet(current -> Math.max(current, id)); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 6d4ccdb..56abee5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -108,5 +108,7 @@ 89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes) 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes) 91 = Operation timed out +92 = Job %1$s has been cleared from job history +93 = Job %1$s has not been created yet 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index b3e65ad..327c422 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -60,8 +60,7 @@ } @Override - public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, - Exception exception) { + public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) { HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload; switch (fn.getFunctionId()) { case GET_CLUSTER_CONTROLLER_INFO: @@ -86,7 +85,7 @@ case DISTRIBUTE_JOB: HyracksClientInterfaceFunctions.DistributeJobFunction djf = (HyracksClientInterfaceFunctions.DistributeJobFunction) fn; - ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory.create(), + ccs.getWorkQueue().schedule(new DistributeJobWork(ccs, djf.getACGGFBytes(), jobIdFactory, new IPCResponder<JobId>(handle, mid))); break; case DESTROY_JOB: @@ -104,42 +103,30 @@ case START_JOB: HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn; - JobId jobId = sjf.getJobId(); - byte[] acggfBytes = null; - boolean predistributed = false; - if (jobId == null) { - //The job is new - jobId = jobIdFactory.create(); - acggfBytes = sjf.getACGGFBytes(); - } else { - //The job has been predistributed. We don't need to send an ActivityClusterGraph - predistributed = true; - } - ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(), - jobId, new IPCResponder<JobId>(handle, mid), predistributed)); + ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), sjf.getACGGFBytes(), + sjf.getJobFlags(), sjf.getJobId(), new IPCResponder<JobId>(handle, mid), jobIdFactory)); break; case GET_DATASET_DIRECTORY_SERIVICE_INFO: - ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs, - new IPCResponder<NetworkAddress>(handle, mid))); + ccs.getWorkQueue().schedule( + new GetDatasetDirectoryServiceInfoWork(ccs, new IPCResponder<NetworkAddress>(handle, mid))); break; case GET_DATASET_RESULT_STATUS: HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn; - ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), - gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid))); + ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(), gdrsf.getResultSetId(), + new IPCResponder<Status>(handle, mid))); break; case GET_DATASET_RESULT_LOCATIONS: HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn; - ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, - gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(), - new IPCResponder<>(handle, mid))); + ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs, gdrlf.getJobId(), + gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<>(handle, mid))); break; case WAIT_FOR_COMPLETION: HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn; - ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(), - new IPCResponder<>(handle, mid))); + ccs.getWorkQueue() + .schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(), new IPCResponder<>(handle, mid))); break; case GET_NODE_CONTROLLERS_INFO: ccs.getWorkQueue().schedule( @@ -155,33 +142,33 @@ case CLI_DEPLOY_BINARY: HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn; - ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), - dbf.getDeploymentId(), new IPCResponder<>(handle, mid))); + ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(), + new IPCResponder<>(handle, mid))); break; case CLI_UNDEPLOY_BINARY: HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn; - ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(), - new IPCResponder<>(handle, mid))); + ccs.getWorkQueue().schedule( + new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(), new IPCResponder<>(handle, mid))); break; case CLUSTER_SHUTDOWN: HyracksClientInterfaceFunctions.ClusterShutdownFunction csf = (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn; - ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs, - csf.isTerminateNCService(), new IPCResponder<>(handle, mid))); + ccs.getWorkQueue().schedule( + new ClusterShutdownWork(ccs, csf.isTerminateNCService(), new IPCResponder<>(handle, mid))); break; case GET_NODE_DETAILS_JSON: HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf = (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn; ccs.getWorkQueue() .schedule(new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(), gndjf.getNodeId(), - gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid))); + gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid))); break; case THREAD_DUMP: HyracksClientInterfaceFunctions.ThreadDumpFunction tdf = (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn; - ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, tdf.getNode(), - new IPCResponder<String>(handle, mid))); + ccs.getWorkQueue() + .schedule(new GetThreadDumpWork(ccs, tdf.getNode(), new IPCResponder<String>(handle, mid))); break; default: try { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 4ba847d..c6d90a7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -46,8 +46,8 @@ import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue; import org.apache.hyracks.control.cc.scheduler.IJobQueue; import org.apache.hyracks.control.common.controllers.CCConfig; -import org.apache.hyracks.control.common.work.NoOpCallback; import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.hyracks.control.common.work.NoOpCallback; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -92,7 +92,7 @@ runMapHistory = new LinkedHashMap<JobId, List<Exception>>() { private static final long serialVersionUID = 1L; /** history size + 1 is for the case when history size = 0 */ - private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1); + private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1); @Override protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) { @@ -277,7 +277,8 @@ @Override public List<Exception> getExceptionHistory(JobId jobId) { - return runMapHistory.get(jobId); + List<Exception> exceptions = runMapHistory.get(jobId); + return exceptions == null ? runMapHistory.containsKey(jobId) ? Collections.emptyList() : null : exceptions; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java index e5fd66a..5a57b1b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/DistributeJobWork.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; @@ -37,12 +38,12 @@ public class DistributeJobWork extends SynchronizableWork { private final ClusterControllerService ccs; private final byte[] acggfBytes; - private final JobId jobId; + private final JobIdFactory jobIdFactory; private final IResultCallback<JobId> callback; - public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobId jobId, + public DistributeJobWork(ClusterControllerService ccs, byte[] acggfBytes, JobIdFactory jobIdFactory, IResultCallback<JobId> callback) { - this.jobId = jobId; + this.jobIdFactory = jobIdFactory; this.ccs = ccs; this.acggfBytes = acggfBytes; this.callback = callback; @@ -51,6 +52,7 @@ @Override protected void doRun() throws Exception { try { + JobId jobId = jobIdFactory.create(); final CCServiceContext ccServiceCtx = ccs.getContext(); ccs.getPreDistributedJobStore().checkForExistingDistributedJobDescriptor(jobId); IActivityClusterGraphGeneratorFactory acggf = diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index e083d2a..ed82705 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobIdFactory; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.application.CCServiceContext; import org.apache.hyracks.control.cc.job.IJobManager; @@ -38,19 +39,19 @@ private final byte[] acggfBytes; private final EnumSet<JobFlag> jobFlags; private final DeploymentId deploymentId; - private final JobId jobId; + private final JobId preDistributedJobId; private final IResultCallback<JobId> callback; - private final boolean predestributed; + private final JobIdFactory jobIdFactory; public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes, - EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, boolean predestributed) { + EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback, JobIdFactory jobIdFactory) { this.deploymentId = deploymentId; - this.jobId = jobId; + this.preDistributedJobId = jobId; this.ccs = ccs; this.acggfBytes = acggfBytes; this.jobFlags = jobFlags; this.callback = callback; - this.predestributed = predestributed; + this.jobIdFactory = jobIdFactory; } @Override @@ -58,8 +59,10 @@ IJobManager jobManager = ccs.getJobManager(); try { final CCServiceContext ccServiceCtx = ccs.getContext(); + JobId jobId; JobRun run; - if (!predestributed) { + if (preDistributedJobId == null) { + jobId = jobIdFactory.create(); //Need to create the ActivityClusterGraph IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils .deserialize(acggfBytes, deploymentId, ccServiceCtx); @@ -67,6 +70,7 @@ acggf.createActivityClusterGraphGenerator(jobId, ccServiceCtx, jobFlags); run = new JobRun(ccs, deploymentId, jobId, acggf, acgg, jobFlags); } else { + jobId = preDistributedJobId; //ActivityClusterGraph has already been distributed run = new JobRun(ccs, deploymentId, jobId, jobFlags, ccs.getPreDistributedJobStore().getDistributedJobDescriptor(jobId)); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java index 713cf96..f1d9a4d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java @@ -18,12 +18,15 @@ */ package org.apache.hyracks.control.cc.work; +import java.util.Collections; import java.util.List; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.job.IJobManager; -import org.apache.hyracks.control.cc.job.IJobStatusConditionVariable; +import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; @@ -41,13 +44,13 @@ @Override protected void doRun() throws Exception { IJobManager jobManager = ccs.getJobManager(); - final IJobStatusConditionVariable cRunningVar = jobManager.get(jobId); - if (cRunningVar != null) { + final JobRun jobRun = jobManager.get(jobId); + if (jobRun != null) { ccs.getExecutor().execute(new Runnable() { @Override public void run() { try { - cRunningVar.waitForCompletion(); + jobRun.waitForCompletion(); callback.setValue(null); } catch (Exception e) { callback.setException(e); @@ -55,18 +58,28 @@ } }); } else { - final List<Exception> exceptions = jobManager.getExceptionHistory(jobId); - ccs.getExecutor().execute(new Runnable() { - @Override - public void run() { + // Couldn't find jobRun + List<Exception> exceptionHistory = jobManager.getExceptionHistory(jobId); + List<Exception> exceptions; + if (exceptionHistory == null) { + // couldn't be found + long maxJobId = ccs.getJobIdFactory().maxJobId(); + exceptions = Collections.singletonList(jobId.getId() <= maxJobId + ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId) + : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId)); + + } else { + exceptions = exceptionHistory; + } + ccs.getExecutor().execute(() -> { + if (!exceptions.isEmpty()) { + /** + * only report the first exception because IResultCallback will only throw one exception + * anyway + */ + callback.setException(exceptions.get(0)); + } else { callback.setValue(null); - if (exceptions != null && !exceptions.isEmpty()) { - /** - * only report the first exception because IResultCallback will only throw one exception - * anyway - */ - callback.setException(exceptions.get(0)); - } } }); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 18479e2..21c1e77 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -62,13 +62,13 @@ public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" }; - private static ClusterControllerService cc; + protected static ClusterControllerService cc; - private static NodeControllerService[] asterixNCs; + protected static NodeControllerService[] asterixNCs; - private static IHyracksClientConnection hcc; + protected static IHyracksClientConnection hcc; - private final List<File> outputFiles; + protected final List<File> outputFiles; public AbstractMultiNCIntegrationTest() { outputFiles = new ArrayList<>(); @@ -82,6 +82,7 @@ ccConfig.setClusterListenAddress("127.0.0.1"); ccConfig.setClusterListenPort(39001); ccConfig.setProfileDumpPeriod(10000); + ccConfig.setJobHistorySize(2); File outDir = new File("target" + File.separator + "ClusterController"); outDir.mkdirs(); File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir); @@ -186,24 +187,30 @@ readSize = reader.read(resultFrame); } } + waitForCompletion(jobId, expectedErrorMessage); + // Waiting a second time should lead to the same behavior + waitForCompletion(jobId, expectedErrorMessage); + dumpOutputFiles(); + } + + protected void waitForCompletion(JobId jobId, String expectedErrorMessage) throws Exception { boolean expectedExceptionThrown = false; try { hcc.waitForCompletion(jobId); - } catch (HyracksDataException hde) { + } catch (Exception e) { if (expectedErrorMessage != null) { - if (hde.toString().contains(expectedErrorMessage)) { + if (e.toString().contains(expectedErrorMessage)) { expectedExceptionThrown = true; } else { - throw hde; + throw e; } } else { - throw hde; + throw e; } } if (expectedErrorMessage != null && !expectedExceptionThrown) { throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown"); } - dumpOutputFiles(); } private void dumpOutputFiles() { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java index 13a103e..34b1480 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java @@ -20,6 +20,7 @@ import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; @@ -32,9 +33,27 @@ @Test public void failureOnCreatePushRuntime() throws Exception { - for (int round = 0; round < 100; ++round) { + JobId jobId = new JobId(0); // First job + for (int i = 0; i < 20; i++) { + execTest(); + if (i == 0) { + // passes. read from job archive + waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE); + } + } + // passes. read from job history + waitForCompletion(jobId, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE); + for (int i = 0; i < 300; i++) { execTest(); } + // passes. history has been cleared + waitForCompletion(jobId, "has been cleared from job history"); + } + + @Test + public void waitForNonExistingJob() throws Exception { + JobId jobId = new JobId(Long.MAX_VALUE); + waitForCompletion(jobId, "has not been created yet"); } private void execTest() throws Exception { -- To view, visit https://asterix-gerrit.ics.uci.edu/1999 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I9e50f6ce1df9f27517d7ec3a3f8a5d38246f71ff Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>