This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 406725a226 [ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is not ACTIVE 406725a226 is described below commit 406725a2267d31f23a88aa9d0d51abbc9a22c71c Author: Ali Alsuliman <ali.al.solai...@gmail.com> AuthorDate: Wed Jul 17 22:26:07 2024 +0300 [ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is not ACTIVE - user model changes: no - storage format changes: no - interface changes: yes Details: Do not run jobs while cluster is not ACTIVE. - Allow certain jobs to run regardless of cluster state. Ext-ref: MB-62635 Change-Id: I9a027e46a9067e18e2fd5de57112f0d15addc702 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18488 Reviewed-by: Murtadha Hubail <mhub...@apache.org> Tested-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../asterix/optimizer/base/AnalysisUtil.java | 2 +- .../asterix/app/active/FeedEventsListener.java | 2 +- .../asterix/app/translator/QueryTranslator.java | 10 +- .../asterix/hyracks/bootstrap/CCApplication.java | 17 ++- .../hyracks/bootstrap/GlobalRecoveryManager.java | 8 -- .../org/apache/asterix/utils/FlushDatasetUtil.java | 2 +- .../org/apache/asterix/utils/RebalanceUtil.java | 10 +- .../org/apache/asterix/common/utils/JobUtils.java | 26 ++++- .../apache/asterix/metadata/entities/Dataset.java | 2 +- .../job/resource/JobCapacityController.java | 16 ++- .../job/resource/JobCapacityControllerTest.java | 35 +++++-- .../hyracks/api/application/ICCApplication.java | 8 ++ .../api/client/IHyracksClientConnection.java | 116 ++++++++++----------- .../api/client/IHyracksClientInterface.java | 4 +- .../apache/hyracks/api/exceptions/ErrorCode.java | 1 + .../java/org/apache/hyracks/api/job/JobFlag.java | 3 +- .../job/resource/DefaultJobCapacityController.java | 6 +- .../api/job/resource/IJobCapacityController.java | 15 ++- .../src/main/resources/errormsg/en.properties | 1 + .../hyracks/control/cc/BaseCCApplication.java | 7 ++ .../apache/hyracks/control/cc/job/JobManager.java | 2 +- .../hyracks/control/cc/scheduler/FIFOJobQueue.java | 3 +- .../hyracks/control/cc/job/JobManagerTest.java | 27 +++-- .../AbstractMultiNCIntegrationTest.java | 4 +- 24 files changed, 210 insertions(+), 117 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java index bb428fa645..d8775c8741 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java @@ -282,7 +282,7 @@ public class AnalysisUtil { JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false)); - JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true); + JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, true); IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId); FrameManager frameManager = new FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 3a81c099d6..4674f7ee4c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -107,7 +107,7 @@ public class FeedEventsListener extends ActiveEntityEventsListener { // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. // We will need to design general exception handling mechanism for feeds. setLocations(jobInfo.getRight()); - return JobUtils.runJob(hcc, feedJob, false); + return JobUtils.runJobIfActive(hcc, feedJob, false); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index ad74d6c4fb..0c736a245b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -302,7 +302,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected final APIFramework apiFramework; protected final IRewriterFactory rewriterFactory; protected final ExecutorService executorService; - protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class); + protected final EnumSet<JobFlag> jobFlags = EnumSet.of(JobFlag.ENSURE_RUNNABLE); protected final IMetadataLockManager lockManager; protected final IMetadataLockUtil lockUtil; protected final IResponsePrinter responsePrinter; @@ -2435,7 +2435,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen requestParameters.isForceDropDataset()); } for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + JobUtils.runJobIfActive(hcc, jobSpec, true); } } catch (Exception e2) { // do no throw exception since still the metadata needs to be compensated. @@ -4298,7 +4298,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception { jobSpec.setRequestId(reqId); - JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); + JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false); LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId); clientRequest.setJobId(jobId); return jobId; @@ -5407,12 +5407,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private static void runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception { - JobUtils.runJob(hcc, jobSpec, jobFlags, true); + JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true); } private static List<IOperatorStats> runJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, List<String> statOperatorNames) throws Exception { - Pair<JobId, List<IOperatorStats>> p = JobUtils.runJob(hcc, jobSpec, jobFlags, true, statOperatorNames); + Pair<JobId, List<IOperatorStats>> p = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true, statOperatorNames); return p.second; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 972417443c..0b7e7d0a2c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -22,6 +22,8 @@ package org.apache.asterix.hyracks.bootstrap; import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP; import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE; +import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED; import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN; import static org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT; @@ -35,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.ServiceLoader; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.asterix.api.http.IQueryWebServerRegistrant; @@ -69,6 +72,7 @@ import org.apache.asterix.common.api.INamespacePathResolver; import org.apache.asterix.common.api.INamespaceResolver; import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.api.IReceptionistFactory; +import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.cluster.IGlobalTxManager; import org.apache.asterix.common.config.AsterixExtension; @@ -111,6 +115,7 @@ import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.control.IGatekeeper; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IODeviceHandle; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.result.IJobResultCallback; @@ -212,7 +217,7 @@ public class CCApplication extends BaseCCApplication { ccServiceCtx.addClusterLifecycleListener(nodeJobTracker); ccServiceCtx.addJobLifecycleListener(globalTxManager); - jobCapacityController = new JobCapacityController(controllerService.getResourceManager()); + jobCapacityController = new JobCapacityController(controllerService.getResourceManager(), this); } protected INamespaceResolver createNamespaceResolver(boolean useDatabaseResolution) { @@ -441,4 +446,14 @@ public class CCApplication extends BaseCCApplication { public IJobResultCallback getJobResultCallback() { return new JobResultCallback(appCtx); } + + @Override + public boolean acceptingJobs(Set<JobFlag> flags) { + // flags == null should not be needed since currently it's not null (but not enforced) + if (flags == null || !flags.contains(JobFlag.ENSURE_RUNNABLE)) { + return true; + } + IClusterStateManager csm = appCtx.getClusterStateManager(); + return csm.getState() == ACTIVE || csm.getState() == REBALANCE_REQUIRED; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index ed9d0c6f4c..08972d450d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -40,8 +40,6 @@ import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.metadata.utils.DatasetUtil; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -75,12 +73,6 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { return Collections.emptySet(); } - private void executeHyracksJob(JobSpecification spec) throws Exception { - spec.setMaxReattempts(0); - JobId jobId = hcc.startJob(spec); - hcc.waitForCompletion(jobId); - } - @Override public void startGlobalRecovery(ICcApplicationContext appCtx) { if (!recoveryCompleted && !recovering) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java index f38c18b705..1c17704ff6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java @@ -75,7 +75,7 @@ public class FlushDatasetUtil { JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, true); spec.setJobletEventListenerFactory(jobEventListenerFactory); - JobUtils.runJob(hcc, spec, true); + JobUtils.runJobIfActive(hcc, spec, true); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index 180b6ccd45..cc9f1ada72 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -305,7 +305,7 @@ public class RebalanceUtil { private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, metadataProvider); - JobUtils.runJob(hcc, spec, true); + JobUtils.forceRunJob(hcc, spec, true); } // Populates the data from the source dataset to the rebalance target dataset. @@ -348,7 +348,7 @@ public class RebalanceUtil { spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, commitOp, 0); // Executes the job. - JobUtils.runJob(hcc, spec, true); + JobUtils.forceRunJob(hcc, spec, true); } private static ITupleProjectorFactory createTupleProjectorFactory(Dataset source, MetadataProvider metadataProvider) @@ -403,7 +403,7 @@ public class RebalanceUtil { EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), null)); } for (JobSpecification jobSpec : jobs) { - JobUtils.runJob(hcc, jobSpec, true); + JobUtils.forceRunJob(hcc, jobSpec, true); } } @@ -427,12 +427,12 @@ public class RebalanceUtil { // Creates the secondary index. JobSpecification indexCreationJobSpec = IndexUtil.buildSecondaryIndexCreationJobSpec(target, index, metadataProvider, null); - JobUtils.runJob(hcc, indexCreationJobSpec, true); + JobUtils.forceRunJob(hcc, indexCreationJobSpec, true); // Loads the secondary index. JobSpecification indexLoadingJobSpec = IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, metadataProvider, null); - JobUtils.runJob(hcc, indexLoadingJobSpec, true); + JobUtils.forceRunJob(hcc, indexLoadingJobSpec, true); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java index c1c6f18d1f..e3f57f3b4a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java @@ -36,12 +36,28 @@ public class JobUtils { ADDED_PENDINGOP_RECORD_TO_METADATA } - public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) + public static JobId forceRunJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) throws Exception { return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), waitForCompletion); } - public static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, + public static JobId runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion) + throws Exception { + return runJob(hcc, spec, EnumSet.of(JobFlag.ENSURE_RUNNABLE), waitForCompletion); + } + + public static JobId runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, + boolean waitForCompletion) throws Exception { + if (jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) { + return runJob(hcc, spec, jobFlags, waitForCompletion); + } else { + EnumSet<JobFlag> flags = EnumSet.copyOf(jobFlags); + flags.add(JobFlag.ENSURE_RUNNABLE); + return runJob(hcc, spec, flags, waitForCompletion); + } + } + + private static JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, boolean waitForCompletion) throws Exception { spec.setMaxReattempts(0); final JobId jobId = hcc.startJob(spec, jobFlags); @@ -57,8 +73,12 @@ public class JobUtils { return jobId; } - public static Pair<JobId, List<IOperatorStats>> runJob(IHyracksClientConnection hcc, JobSpecification spec, + public static Pair<JobId, List<IOperatorStats>> runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec, EnumSet<JobFlag> jobFlags, boolean waitForCompletion, List<String> statOperatorNames) throws Exception { + if (!jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) { + jobFlags = EnumSet.copyOf(jobFlags); + jobFlags.add(JobFlag.ENSURE_RUNNABLE); + } spec.setMaxReattempts(0); final JobId jobId = hcc.startJob(spec, jobFlags); List<IOperatorStats> opStats = null; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 9aa5d84bba..132efbcded 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -403,7 +403,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset { // #. run the jobs for (JobSpecification jobSpec : jobsToExecute) { - JobUtils.runJob(hcc, jobSpec, true); + JobUtils.runJobIfActive(hcc, jobSpec, true); } mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction()); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java index ae903d1f71..236056ca02 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java @@ -19,8 +19,14 @@ package org.apache.asterix.runtime.job.resource; +import java.util.Set; + +import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.apache.hyracks.api.job.resource.IJobCapacityController; @@ -36,13 +42,19 @@ public class JobCapacityController implements IJobCapacityController { private static final Logger LOGGER = LogManager.getLogger(); private final IResourceManager resourceManager; + private final ICCApplication ccApp; - public JobCapacityController(IResourceManager resourceManager) { + public JobCapacityController(IResourceManager resourceManager, ICCApplication ccApp) { this.resourceManager = resourceManager; + this.ccApp = ccApp; } @Override - public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException { + public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) + throws HyracksException { + if (!ccApp.acceptingJobs(jobFlags)) { + throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job); + } IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity(); long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize(); int reqAggregatedNumCores = requiredCapacity.getAggregatedCores(); diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java index 48c61b480e..e70b306919 100644 --- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java +++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java @@ -22,8 +22,13 @@ package org.apache.asterix.runtime.job.resource; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.EnumSet; + +import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IClusterCapacity; @@ -36,31 +41,34 @@ import org.junit.Test; public class JobCapacityControllerTest { + private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class); + @Test public void test() throws HyracksException { + JobId jobId = new JobId(0); IResourceManager resourceManager = makeResourceManagerWithCapacity(4294967296L, 33); - JobCapacityController capacityController = new JobCapacityController(resourceManager); + JobCapacityController capacityController = new JobCapacityController(resourceManager, makeCCApp()); // Verifies the correctness of the allocate method. - Assert.assertTrue(capacityController.allocate( - makeJobWithRequiredCapacity(4294967296L, 16)) == IJobCapacityController.JobSubmissionStatus.EXECUTE); - Assert.assertTrue(capacityController.allocate( - makeJobWithRequiredCapacity(2147483648L, 16)) == IJobCapacityController.JobSubmissionStatus.QUEUE); - Assert.assertTrue(capacityController.allocate( - makeJobWithRequiredCapacity(2147483648L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE); + Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L, 16), jobId, + none) == IJobCapacityController.JobSubmissionStatus.EXECUTE); + Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 16), jobId, + none) == IJobCapacityController.JobSubmissionStatus.QUEUE); + Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 32), jobId, + none) == IJobCapacityController.JobSubmissionStatus.QUEUE); boolean exceedCapacity = false; try { - capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64)); + capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64), jobId, none); } catch (HyracksException e) { exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY); } Assert.assertTrue(exceedCapacity); - Assert.assertTrue(capacityController.allocate( - makeJobWithRequiredCapacity(4294967296L, 32)) == IJobCapacityController.JobSubmissionStatus.QUEUE); + Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L, 32), jobId, + none) == IJobCapacityController.JobSubmissionStatus.QUEUE); exceedCapacity = false; try { - capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33)); + capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33), jobId, none); } catch (HyracksException e) { exceedCapacity = e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY); } @@ -95,4 +103,9 @@ public class JobCapacityControllerTest { return clusterCapacity; } + private ICCApplication makeCCApp() { + ICCApplication ccApp = mock(ICCApplication.class); + when(ccApp.acceptingJobs(none)).thenReturn(true); + return ccApp; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java index 5cc8f69085..6c2238640b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.api.application; +import java.util.Set; + import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.control.IGatekeeper; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.result.IJobResultCallback; @@ -37,4 +40,9 @@ public interface ICCApplication extends IApplication { * @return the job result callback */ IJobResultCallback getJobResultCallback(); + + /** + * @return true if the application is accepting jobs. False, otherwise. + */ + boolean acceptingJobs(Set<JobFlag> jobFlags); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index be965ebc2f..66f4fab779 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -89,55 +89,93 @@ public interface IHyracksClientConnection extends IClusterInfoCollector { JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; /** - * Distribute the specified Job. + * Used to run a deployed Job Spec by id * + * @param deployedJobSpecId + * The id of the deployed job spec + * @param jobParameters + * The serialized job parameters + * @throws Exception + */ + JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception; + + /** + * Start the specified Job. + * + * @param acggf + * Activity Cluster Graph Generator Factory + * @param jobFlags + * Flags + * @throws Exception + */ + JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception; + + /** + * Start the specified Job. + * + * @param deploymentId + * the id of the specific deployment * @param jobSpec * Job Specification * @throws Exception */ - DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception; + JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception; /** - * Update the JobSpec for a deployed job. + * Start the specified Job. * - * @param deployedJobSpecId - * The id of the deployed job spec + * @param deploymentId + * the id of the specific deployment * @param jobSpec * Job Specification + * @param jobFlags + * Flags * @throws Exception */ - void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) throws Exception; + JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; /** - * Remove the deployed Job Spec + * Start the specified Job. * - * @param deployedJobSpecId - * The id of the deployed job spec + * @param deploymentId + * the id of the specific deployment + * @param acggf + * Activity Cluster Graph Generator Factory + * @param jobFlags + * Flags * @throws Exception */ - void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; + JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) + throws Exception; /** - * Used to run a deployed Job Spec by id + * Distribute the specified Job. + * + * @param jobSpec + * Job Specification + * @throws Exception + */ + DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception; + + /** + * Update the JobSpec for a deployed job. * * @param deployedJobSpecId * The id of the deployed job spec - * @param jobParameters - * The serialized job parameters + * @param jobSpec + * Job Specification * @throws Exception */ - JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception; + void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification jobSpec) throws Exception; /** - * Start the specified Job. + * Remove the deployed Job Spec * - * @param acggf - * Activity Cluster Graph Generator Factory - * @param jobFlags - * Flags + * @param deployedJobSpecId + * The id of the deployed job spec * @throws Exception */ - JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception; + void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception; /** * Gets the IP Address and port for the ResultDirectoryService wrapped in NetworkAddress @@ -195,44 +233,6 @@ public interface IHyracksClientConnection extends IClusterInfoCollector { */ void unDeployBinary(DeploymentId deploymentId) throws Exception; - /** - * Start the specified Job. - * - * @param deploymentId - * the id of the specific deployment - * @param jobSpec - * Job Specification - * @throws Exception - */ - JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception; - - /** - * Start the specified Job. - * - * @param deploymentId - * the id of the specific deployment - * @param jobSpec - * Job Specification - * @param jobFlags - * Flags - * @throws Exception - */ - JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception; - - /** - * Start the specified Job. - * - * @param deploymentId - * the id of the specific deployment - * @param acggf - * Activity Cluster Graph Generator Factory - * @param jobFlags - * Flags - * @throws Exception - */ - JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) - throws Exception; - /** * Shuts down all NCs and then the CC. * diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index be470146f6..f63e0ea129 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -42,6 +42,8 @@ public interface IHyracksClientInterface { public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception; + public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception; + public void cancelJob(JobId jobId) throws Exception; public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception; @@ -64,8 +66,6 @@ public interface IHyracksClientInterface { public void unDeployBinary(DeploymentId deploymentId) throws Exception; - public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception; - public JobInfo getJobInfo(JobId jobId) throws Exception; public void stopCluster(boolean terminateNCService) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index e46c0ef490..766fb26e57 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -156,6 +156,7 @@ public enum ErrorCode implements IError { ILLEGAL_STATE(126), INVALID_STRING_UNICODE(127), UNSUPPORTED_WRITE_SPEC(128), + JOB_REJECTED(129), // Compilation error codes. RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000), diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java index 7225cd4964..93848a1ff3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java @@ -20,5 +20,6 @@ package org.apache.hyracks.api.job; public enum JobFlag { PROFILE_RUNTIME, - ENFORCE_CONTRACT + ENFORCE_CONTRACT, + ENSURE_RUNNABLE } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java index b18bcb10ee..0bfdb370ca 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java @@ -19,6 +19,10 @@ package org.apache.hyracks.api.job.resource; +import java.util.Set; + +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; public class DefaultJobCapacityController implements IJobCapacityController { @@ -34,7 +38,7 @@ public class DefaultJobCapacityController implements IJobCapacityController { } @Override - public JobSubmissionStatus allocate(JobSpecification job) { + public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) { return JobSubmissionStatus.EXECUTE; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java index f88baa2ee9..f2c03f6167 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java @@ -19,7 +19,11 @@ package org.apache.hyracks.api.job.resource; +import java.util.Set; + import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; /** @@ -41,13 +45,14 @@ public interface IJobCapacityController { * Allocates required cluster capacity for a job. * * @param job, - * the job specification. - * @return EXECUTE, if the job can be executed immediately; - * QUEUE, if the job cannot be executed + * the job specification. + * @param jobId + * the job id. + * @return EXECUTE, if the job can be executed immediately; QUEUE, if the job cannot be executed * @throws HyracksException - * if the job's capacity requirement exceeds the maximum capacity of the cluster. + * if the job's capacity requirement exceeds the maximum capacity of the cluster. */ - JobSubmissionStatus allocate(JobSpecification job) throws HyracksException; + JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) throws HyracksException; /** * Releases cluster capacity for a job when it completes. diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index b3c2d7b02d..fa52bc605e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -146,6 +146,7 @@ 126 = Illegal state. %1$s 127 = Decoding error - %1$s 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s +129 = Job %1$s not run. Cluster is not accepting jobs 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java index efd42c9c59..22e4bbafa9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.cc; import java.util.Arrays; +import java.util.Set; import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.application.ICCServiceContext; @@ -26,6 +27,7 @@ import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.api.control.IGatekeeper; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.resource.DefaultJobCapacityController; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.result.IJobResultCallback; @@ -125,4 +127,9 @@ public class BaseCCApplication implements ICCApplication { // no op }; } + + @Override + public boolean acceptingJobs(Set<JobFlag> flags) { + return true; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index be3daae894..7c7111f490 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -123,7 +123,7 @@ public class JobManager implements IJobManager { JobSpecification job = jobRun.getJobSpecification(); IJobCapacityController.JobSubmissionStatus status; try { - status = jobCapacityController.allocate(job); + status = jobCapacityController.allocate(job, jobRun.getJobId(), jobRun.getFlags()); CCServiceContext serviceCtx = ccs.getContext(); serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status); switch (status) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index 38277c2af7..d0038535bb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -90,7 +90,8 @@ public class FIFOJobQueue implements IJobQueue { // Cluster maximum capacity can change over time, thus we have to re-check if the job should be rejected // or not. try { - IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); + IJobCapacityController.JobSubmissionStatus status = + jobCapacityController.allocate(job, run.getJobId(), run.getFlags()); // Checks if the job can be executed immediately. if (status == IJobCapacityController.JobSubmissionStatus.EXECUTE) { jobRuns.add(run); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java index 19340d0928..3e87bb3cf1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java @@ -30,12 +30,14 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; @@ -57,6 +59,8 @@ import org.mockito.Mockito; public class JobManagerTest { + private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class); + private CCConfig ccConfig; @Before @@ -77,7 +81,8 @@ public class JobManagerTest { JobRun run = mockJobRun(id); JobSpecification job = mock(JobSpecification.class); when(run.getJobSpecification()).thenReturn(job); - when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); + when(jobCapacityController.allocate(job, run.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); // Submits the job. acceptedRuns.add(run); @@ -93,7 +98,8 @@ public class JobManagerTest { JobRun run = mockJobRun(id); JobSpecification job = mock(JobSpecification.class); when(run.getJobSpecification()).thenReturn(job); - when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) + when(jobCapacityController.allocate(job, run.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); // Submits the job. @@ -109,7 +115,8 @@ public class JobManagerTest { JobRun run = mockJobRun(8193); JobSpecification job = mock(JobSpecification.class); when(run.getJobSpecification()).thenReturn(job); - when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) + when(jobCapacityController.allocate(job, run.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); jobManager.add(run); } catch (HyracksException e) { @@ -149,7 +156,7 @@ public class JobManagerTest { JobRun run = mockJobRun(1); JobSpecification job = mock(JobSpecification.class); when(run.getJobSpecification()).thenReturn(job); - when(jobCapacityController.allocate(job)) + when(jobCapacityController.allocate(job, run.getJobId(), none)) .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0")); jobManager.add(run); } catch (HyracksException e) { @@ -172,14 +179,16 @@ public class JobManagerTest { JobRun run1 = mockJobRun(1); JobSpecification job1 = mock(JobSpecification.class); when(run1.getJobSpecification()).thenReturn(job1); - when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); + when(jobCapacityController.allocate(job1, run1.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); jobManager.add(run1); // A failure run. JobRun run2 = mockJobRun(2); JobSpecification job2 = mock(JobSpecification.class); when(run2.getJobSpecification()).thenReturn(job2); - when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) + when(jobCapacityController.allocate(job2, run2.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, "1", "0")); jobManager.add(run2); @@ -220,7 +229,8 @@ public class JobManagerTest { JobRun run = mockJobRun(id); JobSpecification job = mock(JobSpecification.class); when(run.getJobSpecification()).thenReturn(job); - when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); + when(jobCapacityController.allocate(job, run.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); // Submits the job. acceptedRuns.add(run); @@ -236,7 +246,8 @@ public class JobManagerTest { JobRun run = mockJobRun(id); JobSpecification job = mock(JobSpecification.class); when(run.getJobSpecification()).thenReturn(job); - when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) + when(jobCapacityController.allocate(job, run.getJobId(), none)) + .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE) .thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE); // Submits the job. diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 7a75a0fcca..97e15337b9 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.client.IHyracksClientConnection; @@ -247,7 +248,8 @@ public abstract class AbstractMultiNCIntegrationTest { private long maxRAM = Runtime.getRuntime().maxMemory(); @Override - public JobSubmissionStatus allocate(JobSpecification job) throws HyracksException { + public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags) + throws HyracksException { return maxRAM > job.getRequiredClusterCapacity().getAggregatedMemoryByteSize() ? JobSubmissionStatus.EXECUTE : JobSubmissionStatus.QUEUE; }