This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 406725a226 [ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is 
not ACTIVE
406725a226 is described below

commit 406725a2267d31f23a88aa9d0d51abbc9a22c71c
Author: Ali Alsuliman <ali.al.solai...@gmail.com>
AuthorDate: Wed Jul 17 22:26:07 2024 +0300

    [ASTERIXDB-3460][*DB][HYR] Do not run jobs while cluster is not ACTIVE
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Do not run jobs while cluster is not ACTIVE.
    
    - Allow certain jobs to run regardless of cluster state.
    
    Ext-ref: MB-62635
    Change-Id: I9a027e46a9067e18e2fd5de57112f0d15addc702
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18488
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Tested-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../asterix/optimizer/base/AnalysisUtil.java       |   2 +-
 .../asterix/app/active/FeedEventsListener.java     |   2 +-
 .../asterix/app/translator/QueryTranslator.java    |  10 +-
 .../asterix/hyracks/bootstrap/CCApplication.java   |  17 ++-
 .../hyracks/bootstrap/GlobalRecoveryManager.java   |   8 --
 .../org/apache/asterix/utils/FlushDatasetUtil.java |   2 +-
 .../org/apache/asterix/utils/RebalanceUtil.java    |  10 +-
 .../org/apache/asterix/common/utils/JobUtils.java  |  26 ++++-
 .../apache/asterix/metadata/entities/Dataset.java  |   2 +-
 .../job/resource/JobCapacityController.java        |  16 ++-
 .../job/resource/JobCapacityControllerTest.java    |  35 +++++--
 .../hyracks/api/application/ICCApplication.java    |   8 ++
 .../api/client/IHyracksClientConnection.java       | 116 ++++++++++-----------
 .../api/client/IHyracksClientInterface.java        |   4 +-
 .../apache/hyracks/api/exceptions/ErrorCode.java   |   1 +
 .../java/org/apache/hyracks/api/job/JobFlag.java   |   3 +-
 .../job/resource/DefaultJobCapacityController.java |   6 +-
 .../api/job/resource/IJobCapacityController.java   |  15 ++-
 .../src/main/resources/errormsg/en.properties      |   1 +
 .../hyracks/control/cc/BaseCCApplication.java      |   7 ++
 .../apache/hyracks/control/cc/job/JobManager.java  |   2 +-
 .../hyracks/control/cc/scheduler/FIFOJobQueue.java |   3 +-
 .../hyracks/control/cc/job/JobManagerTest.java     |  27 +++--
 .../AbstractMultiNCIntegrationTest.java            |   4 +-
 24 files changed, 210 insertions(+), 117 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index bb428fa645..d8775c8741 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -282,7 +282,7 @@ public class AnalysisUtil {
 
             JobSpecification jobSpec = compiler.createJob(appCtx, new 
JobEventListenerFactory(newTxnId, false));
 
-            JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true);
+            JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, 
true);
 
             IResultSetReader resultSetReader = 
appCtx.getResultSet().createReader(jobId, resultSetId);
             FrameManager frameManager = new 
FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 3a81c099d6..4674f7ee4c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -107,7 +107,7 @@ public class FeedEventsListener extends 
ActiveEntityEventsListener {
             // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
             // We will need to design general exception handling mechanism for 
feeds.
             setLocations(jobInfo.getRight());
-            return JobUtils.runJob(hcc, feedJob, false);
+            return JobUtils.runJobIfActive(hcc, feedJob, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ad74d6c4fb..0c736a245b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -302,7 +302,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
     protected final APIFramework apiFramework;
     protected final IRewriterFactory rewriterFactory;
     protected final ExecutorService executorService;
-    protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
+    protected final EnumSet<JobFlag> jobFlags = 
EnumSet.of(JobFlag.ENSURE_RUNNABLE);
     protected final IMetadataLockManager lockManager;
     protected final IMetadataLockUtil lockUtil;
     protected final IResponsePrinter responsePrinter;
@@ -2435,7 +2435,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                                 requestParameters.isForceDropDataset());
                     }
                     for (JobSpecification jobSpec : jobsToExecute) {
-                        JobUtils.runJob(hcc, jobSpec, true);
+                        JobUtils.runJobIfActive(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
                     // do no throw exception since still the metadata needs to 
be compensated.
@@ -4298,7 +4298,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     private static JobId runTrackJob(IHyracksClientConnection hcc, 
JobSpecification jobSpec, EnumSet<JobFlag> jobFlags,
             String reqId, String clientCtxId, ClientRequest clientRequest) 
throws Exception {
         jobSpec.setRequestId(reqId);
-        JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+        JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false);
         LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, 
reqId, clientCtxId);
         clientRequest.setJobId(jobId);
         return jobId;
@@ -5407,12 +5407,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
     private static void runJob(IHyracksClientConnection hcc, JobSpecification 
jobSpec, EnumSet<JobFlag> jobFlags)
             throws Exception {
-        JobUtils.runJob(hcc, jobSpec, jobFlags, true);
+        JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, true);
     }
 
     private static List<IOperatorStats> runJob(IHyracksClientConnection hcc, 
JobSpecification jobSpec,
             EnumSet<JobFlag> jobFlags, List<String> statOperatorNames) throws 
Exception {
-        Pair<JobId, List<IOperatorStats>> p = JobUtils.runJob(hcc, jobSpec, 
jobFlags, true, statOperatorNames);
+        Pair<JobId, List<IOperatorStats>> p = JobUtils.runJobIfActive(hcc, 
jobSpec, jobFlags, true, statOperatorNames);
         return p.second;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 972417443c..0b7e7d0a2c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -22,6 +22,8 @@ package org.apache.asterix.hyracks.bootstrap;
 import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
 import static 
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
 import static 
org.apache.hyracks.control.common.controllers.ControllerConfig.Option.CLOUD_DEPLOYMENT;
 
@@ -35,6 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.IQueryWebServerRegistrant;
@@ -69,6 +72,7 @@ import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.api.INamespaceResolver;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.cluster.IGlobalTxManager;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -111,6 +115,7 @@ import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.IGatekeeper;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.result.IJobResultCallback;
@@ -212,7 +217,7 @@ public class CCApplication extends BaseCCApplication {
         ccServiceCtx.addClusterLifecycleListener(nodeJobTracker);
         ccServiceCtx.addJobLifecycleListener(globalTxManager);
 
-        jobCapacityController = new 
JobCapacityController(controllerService.getResourceManager());
+        jobCapacityController = new 
JobCapacityController(controllerService.getResourceManager(), this);
     }
 
     protected INamespaceResolver createNamespaceResolver(boolean 
useDatabaseResolution) {
@@ -441,4 +446,14 @@ public class CCApplication extends BaseCCApplication {
     public IJobResultCallback getJobResultCallback() {
         return new JobResultCallback(appCtx);
     }
+
+    @Override
+    public boolean acceptingJobs(Set<JobFlag> flags) {
+        // flags == null should not be needed since currently it's not null 
(but not enforced)
+        if (flags == null || !flags.contains(JobFlag.ENSURE_RUNNABLE)) {
+            return true;
+        }
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+        return csm.getState() == ACTIVE || csm.getState() == 
REBALANCE_REQUIRED;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ed9d0c6f4c..08972d450d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -40,8 +40,6 @@ import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -75,12 +73,6 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
         return Collections.emptySet();
     }
 
-    private void executeHyracksJob(JobSpecification spec) throws Exception {
-        spec.setMaxReattempts(0);
-        JobId jobId = hcc.startJob(spec);
-        hcc.waitForCompletion(jobId);
-    }
-
     @Override
     public void startGlobalRecovery(ICcApplicationContext appCtx) {
         if (!recoveryCompleted && !recovering) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index f38c18b705..1c17704ff6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -75,7 +75,7 @@ public class FlushDatasetUtil {
 
         JobEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(txnId, true);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        JobUtils.runJob(hcc, spec, true);
+        JobUtils.runJobIfActive(hcc, spec, true);
     }
 
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 180b6ccd45..cc9f1ada72 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -305,7 +305,7 @@ public class RebalanceUtil {
     private static void createRebalanceTarget(Dataset target, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, 
metadataProvider);
-        JobUtils.runJob(hcc, spec, true);
+        JobUtils.forceRunJob(hcc, spec, true);
     }
 
     // Populates the data from the source dataset to the rebalance target 
dataset.
@@ -348,7 +348,7 @@ public class RebalanceUtil {
         spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, 
commitOp, 0);
 
         // Executes the job.
-        JobUtils.runJob(hcc, spec, true);
+        JobUtils.forceRunJob(hcc, spec, true);
     }
 
     private static ITupleProjectorFactory createTupleProjectorFactory(Dataset 
source, MetadataProvider metadataProvider)
@@ -403,7 +403,7 @@ public class RebalanceUtil {
                     EnumSet.of(DropOption.IF_EXISTS, 
DropOption.WAIT_ON_IN_USE), null));
         }
         for (JobSpecification jobSpec : jobs) {
-            JobUtils.runJob(hcc, jobSpec, true);
+            JobUtils.forceRunJob(hcc, jobSpec, true);
         }
     }
 
@@ -427,12 +427,12 @@ public class RebalanceUtil {
             // Creates the secondary index.
             JobSpecification indexCreationJobSpec =
                     IndexUtil.buildSecondaryIndexCreationJobSpec(target, 
index, metadataProvider, null);
-            JobUtils.runJob(hcc, indexCreationJobSpec, true);
+            JobUtils.forceRunJob(hcc, indexCreationJobSpec, true);
 
             // Loads the secondary index.
             JobSpecification indexLoadingJobSpec =
                     IndexUtil.buildSecondaryIndexLoadingJobSpec(target, index, 
metadataProvider, null);
-            JobUtils.runJob(hcc, indexLoadingJobSpec, true);
+            JobUtils.forceRunJob(hcc, indexLoadingJobSpec, true);
         }
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
index c1c6f18d1f..e3f57f3b4a 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/JobUtils.java
@@ -36,12 +36,28 @@ public class JobUtils {
         ADDED_PENDINGOP_RECORD_TO_METADATA
     }
 
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, boolean waitForCompletion)
+    public static JobId forceRunJob(IHyracksClientConnection hcc, 
JobSpecification spec, boolean waitForCompletion)
             throws Exception {
         return runJob(hcc, spec, EnumSet.noneOf(JobFlag.class), 
waitForCompletion);
     }
 
-    public static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, EnumSet<JobFlag> jobFlags,
+    public static JobId runJobIfActive(IHyracksClientConnection hcc, 
JobSpecification spec, boolean waitForCompletion)
+            throws Exception {
+        return runJob(hcc, spec, EnumSet.of(JobFlag.ENSURE_RUNNABLE), 
waitForCompletion);
+    }
+
+    public static JobId runJobIfActive(IHyracksClientConnection hcc, 
JobSpecification spec, EnumSet<JobFlag> jobFlags,
+            boolean waitForCompletion) throws Exception {
+        if (jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+            return runJob(hcc, spec, jobFlags, waitForCompletion);
+        } else {
+            EnumSet<JobFlag> flags = EnumSet.copyOf(jobFlags);
+            flags.add(JobFlag.ENSURE_RUNNABLE);
+            return runJob(hcc, spec, flags, waitForCompletion);
+        }
+    }
+
+    private static JobId runJob(IHyracksClientConnection hcc, JobSpecification 
spec, EnumSet<JobFlag> jobFlags,
             boolean waitForCompletion) throws Exception {
         spec.setMaxReattempts(0);
         final JobId jobId = hcc.startJob(spec, jobFlags);
@@ -57,8 +73,12 @@ public class JobUtils {
         return jobId;
     }
 
-    public static Pair<JobId, List<IOperatorStats>> 
runJob(IHyracksClientConnection hcc, JobSpecification spec,
+    public static Pair<JobId, List<IOperatorStats>> 
runJobIfActive(IHyracksClientConnection hcc, JobSpecification spec,
             EnumSet<JobFlag> jobFlags, boolean waitForCompletion, List<String> 
statOperatorNames) throws Exception {
+        if (!jobFlags.contains(JobFlag.ENSURE_RUNNABLE)) {
+            jobFlags = EnumSet.copyOf(jobFlags);
+            jobFlags.add(JobFlag.ENSURE_RUNNABLE);
+        }
         spec.setMaxReattempts(0);
         final JobId jobId = hcc.startJob(spec, jobFlags);
         List<IOperatorStats> opStats = null;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9aa5d84bba..132efbcded 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -403,7 +403,7 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
 
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
+                JobUtils.runJobIfActive(hcc, jobSpec, true);
             }
 
             mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index ae903d1f71..236056ca02 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -19,8 +19,14 @@
 
 package org.apache.asterix.runtime.job.resource;
 
+import java.util.Set;
+
+import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
@@ -36,13 +42,19 @@ public class JobCapacityController implements 
IJobCapacityController {
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final IResourceManager resourceManager;
+    private final ICCApplication ccApp;
 
-    public JobCapacityController(IResourceManager resourceManager) {
+    public JobCapacityController(IResourceManager resourceManager, 
ICCApplication ccApp) {
         this.resourceManager = resourceManager;
+        this.ccApp = ccApp;
     }
 
     @Override
-    public JobSubmissionStatus allocate(JobSpecification job) throws 
HyracksException {
+    public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, 
Set<JobFlag> jobFlags)
+            throws HyracksException {
+        if (!ccApp.acceptingJobs(jobFlags)) {
+            throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job);
+        }
         IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
         long reqAggregatedMemoryByteSize = 
requiredCapacity.getAggregatedMemoryByteSize();
         int reqAggregatedNumCores = requiredCapacity.getAggregatedCores();
diff --git 
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
 
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
index 48c61b480e..e70b306919 100644
--- 
a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
+++ 
b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/resource/JobCapacityControllerTest.java
@@ -22,8 +22,13 @@ package org.apache.asterix.runtime.job.resource;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.EnumSet;
+
+import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
@@ -36,31 +41,34 @@ import org.junit.Test;
 
 public class JobCapacityControllerTest {
 
+    private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
     @Test
     public void test() throws HyracksException {
+        JobId jobId = new JobId(0);
         IResourceManager resourceManager = 
makeResourceManagerWithCapacity(4294967296L, 33);
-        JobCapacityController capacityController = new 
JobCapacityController(resourceManager);
+        JobCapacityController capacityController = new 
JobCapacityController(resourceManager, makeCCApp());
 
         // Verifies the correctness of the allocate method.
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(4294967296L, 16)) == 
IJobCapacityController.JobSubmissionStatus.EXECUTE);
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(2147483648L, 16)) == 
IJobCapacityController.JobSubmissionStatus.QUEUE);
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(2147483648L, 32)) == 
IJobCapacityController.JobSubmissionStatus.QUEUE);
+        
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L,
 16), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L,
 16), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
+        
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(2147483648L,
 32), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
 
         boolean exceedCapacity = false;
         try {
-            
capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64));
+            
capacityController.allocate(makeJobWithRequiredCapacity(2147483648L, 64), 
jobId, none);
         } catch (HyracksException e) {
             exceedCapacity = 
e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
         }
         Assert.assertTrue(exceedCapacity);
-        Assert.assertTrue(capacityController.allocate(
-                makeJobWithRequiredCapacity(4294967296L, 32)) == 
IJobCapacityController.JobSubmissionStatus.QUEUE);
+        
Assert.assertTrue(capacityController.allocate(makeJobWithRequiredCapacity(4294967296L,
 32), jobId,
+                none) == IJobCapacityController.JobSubmissionStatus.QUEUE);
         exceedCapacity = false;
         try {
-            
capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33));
+            
capacityController.allocate(makeJobWithRequiredCapacity(4294967297L, 33), 
jobId, none);
         } catch (HyracksException e) {
             exceedCapacity = 
e.matches(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY);
         }
@@ -95,4 +103,9 @@ public class JobCapacityControllerTest {
         return clusterCapacity;
     }
 
+    private ICCApplication makeCCApp() {
+        ICCApplication ccApp = mock(ICCApplication.class);
+        when(ccApp.acceptingJobs(none)).thenReturn(true);
+        return ccApp;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
index 5cc8f69085..6c2238640b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplication.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.api.application;
 
+import java.util.Set;
+
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
 
@@ -37,4 +40,9 @@ public interface ICCApplication extends IApplication {
      * @return the job result callback
      */
     IJobResultCallback getJobResultCallback();
+
+    /**
+     * @return true if the application is accepting jobs. False, otherwise.
+     */
+    boolean acceptingJobs(Set<JobFlag> jobFlags);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index be965ebc2f..66f4fab779 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -89,55 +89,93 @@ public interface IHyracksClientConnection extends 
IClusterInfoCollector {
     JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws 
Exception;
 
     /**
-     * Distribute the specified Job.
+     * Used to run a deployed Job Spec by id
      *
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
+     * @param jobParameters
+     *            The serialized job parameters
+     * @throws Exception
+     */
+    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> 
jobParameters) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param deploymentId
+     *            the id of the specific deployment
      * @param jobSpec
      *            Job Specification
      * @throws Exception
      */
-    DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws 
Exception;
 
     /**
-     * Update the JobSpec for a deployed job.
+     * Start the specified Job.
      *
-     * @param deployedJobSpecId
-     *            The id of the deployed job spec
+     * @param deploymentId
+     *            the id of the specific deployment
      * @param jobSpec
      *            Job Specification
+     * @param jobFlags
+     *            Flags
      * @throws Exception
      */
-    void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification 
jobSpec) throws Exception;
+    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, 
EnumSet<JobFlag> jobFlags) throws Exception;
 
     /**
-     * Remove the deployed Job Spec
+     * Start the specified Job.
      *
-     * @param deployedJobSpecId
-     *            The id of the deployed job spec
+     * @param deploymentId
+     *            the id of the specific deployment
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
      * @throws Exception
      */
-    void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+    JobId startJob(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception;
 
     /**
-     * Used to run a deployed Job Spec by id
+     * Distribute the specified Job.
+     *
+     * @param jobSpec
+     *            Job Specification
+     * @throws Exception
+     */
+    DeployedJobSpecId deployJobSpec(JobSpecification jobSpec) throws Exception;
+
+    /**
+     * Update the JobSpec for a deployed job.
      *
      * @param deployedJobSpecId
      *            The id of the deployed job spec
-     * @param jobParameters
-     *            The serialized job parameters
+     * @param jobSpec
+     *            Job Specification
      * @throws Exception
      */
-    JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> 
jobParameters) throws Exception;
+    void redeployJobSpec(DeployedJobSpecId deployedJobSpecId, JobSpecification 
jobSpec) throws Exception;
 
     /**
-     * Start the specified Job.
+     * Remove the deployed Job Spec
      *
-     * @param acggf
-     *            Activity Cluster Graph Generator Factory
-     * @param jobFlags
-     *            Flags
+     * @param deployedJobSpecId
+     *            The id of the deployed job spec
      * @throws Exception
      */
-    JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception;
+    void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
 
     /**
      * Gets the IP Address and port for the ResultDirectoryService wrapped in 
NetworkAddress
@@ -195,44 +233,6 @@ public interface IHyracksClientConnection extends 
IClusterInfoCollector {
      */
     void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
-    /**
-     * Start the specified Job.
-     *
-     * @param deploymentId
-     *            the id of the specific deployment
-     * @param jobSpec
-     *            Job Specification
-     * @throws Exception
-     */
-    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws 
Exception;
-
-    /**
-     * Start the specified Job.
-     *
-     * @param deploymentId
-     *            the id of the specific deployment
-     * @param jobSpec
-     *            Job Specification
-     * @param jobFlags
-     *            Flags
-     * @throws Exception
-     */
-    JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, 
EnumSet<JobFlag> jobFlags) throws Exception;
-
-    /**
-     * Start the specified Job.
-     *
-     * @param deploymentId
-     *            the id of the specific deployment
-     * @param acggf
-     *            Activity Cluster Graph Generator Factory
-     * @param jobFlags
-     *            Flags
-     * @throws Exception
-     */
-    JobId startJob(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
-            throws Exception;
-
     /**
      * Shuts down all NCs and then the CC.
      *
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index be470146f6..f63e0ea129 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -42,6 +42,8 @@ public interface IHyracksClientInterface {
 
     public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception;
 
+    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) throws Exception;
+
     public void cancelJob(JobId jobId) throws Exception;
 
     public DeployedJobSpecId deployJobSpec(byte[] acggfBytes) throws Exception;
@@ -64,8 +66,6 @@ public interface IHyracksClientInterface {
 
     public void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
-    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) throws Exception;
-
     public JobInfo getJobInfo(JobId jobId) throws Exception;
 
     public void stopCluster(boolean terminateNCService) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e46c0ef490..766fb26e57 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -156,6 +156,7 @@ public enum ErrorCode implements IError {
     ILLEGAL_STATE(126),
     INVALID_STRING_UNICODE(127),
     UNSUPPORTED_WRITE_SPEC(128),
+    JOB_REJECTED(129),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index 7225cd4964..93848a1ff3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -20,5 +20,6 @@ package org.apache.hyracks.api.job;
 
 public enum JobFlag {
     PROFILE_RUNTIME,
-    ENFORCE_CONTRACT
+    ENFORCE_CONTRACT,
+    ENSURE_RUNNABLE
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index b18bcb10ee..0bfdb370ca 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -19,6 +19,10 @@
 
 package org.apache.hyracks.api.job.resource;
 
+import java.util.Set;
+
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class DefaultJobCapacityController implements IJobCapacityController {
@@ -34,7 +38,7 @@ public class DefaultJobCapacityController implements 
IJobCapacityController {
     }
 
     @Override
-    public JobSubmissionStatus allocate(JobSpecification job) {
+    public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, 
Set<JobFlag> jobFlags) {
         return JobSubmissionStatus.EXECUTE;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index f88baa2ee9..f2c03f6167 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -19,7 +19,11 @@
 
 package org.apache.hyracks.api.job.resource;
 
+import java.util.Set;
+
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 /**
@@ -41,13 +45,14 @@ public interface IJobCapacityController {
      * Allocates required cluster capacity for a job.
      *
      * @param job,
-     *            the job specification.
-     * @return EXECUTE, if the job can be executed immediately;
-     *         QUEUE, if the job cannot be executed
+     *         the job specification.
+     * @param jobId
+     *         the job id.
+     * @return EXECUTE, if the job can be executed immediately; QUEUE, if the 
job cannot be executed
      * @throws HyracksException
-     *             if the job's capacity requirement exceeds the maximum 
capacity of the cluster.
+     *         if the job's capacity requirement exceeds the maximum capacity 
of the cluster.
      */
-    JobSubmissionStatus allocate(JobSpecification job) throws HyracksException;
+    JobSubmissionStatus allocate(JobSpecification job, JobId jobId, 
Set<JobFlag> jobFlags) throws HyracksException;
 
     /**
      * Releases cluster capacity for a job when it completes.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index b3c2d7b02d..fa52bc605e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -146,6 +146,7 @@
 126 = Illegal state. %1$s
 127 = Decoding error - %1$s
 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
+129 = Job %1$s not run. Cluster is not accepting jobs
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index efd42c9c59..22e4bbafa9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.cc;
 
 import java.util.Arrays;
+import java.util.Set;
 
 import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -26,6 +27,7 @@ import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.api.control.IGatekeeper;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
@@ -125,4 +127,9 @@ public class BaseCCApplication implements ICCApplication {
             // no op
         };
     }
+
+    @Override
+    public boolean acceptingJobs(Set<JobFlag> flags) {
+        return true;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index be3daae894..7c7111f490 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -123,7 +123,7 @@ public class JobManager implements IJobManager {
         JobSpecification job = jobRun.getJobSpecification();
         IJobCapacityController.JobSubmissionStatus status;
         try {
-            status = jobCapacityController.allocate(job);
+            status = jobCapacityController.allocate(job, jobRun.getJobId(), 
jobRun.getFlags());
             CCServiceContext serviceCtx = ccs.getContext();
             serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
             switch (status) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 38277c2af7..d0038535bb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -90,7 +90,8 @@ public class FIFOJobQueue implements IJobQueue {
             // Cluster maximum capacity can change over time, thus we have to 
re-check if the job should be rejected
             // or not.
             try {
-                IJobCapacityController.JobSubmissionStatus status = 
jobCapacityController.allocate(job);
+                IJobCapacityController.JobSubmissionStatus status =
+                        jobCapacityController.allocate(job, run.getJobId(), 
run.getFlags());
                 // Checks if the job can be executed immediately.
                 if (status == 
IJobCapacityController.JobSubmissionStatus.EXECUTE) {
                     jobRuns.add(run);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 19340d0928..3e87bb3cf1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -30,12 +30,14 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
@@ -57,6 +59,8 @@ import org.mockito.Mockito;
 
 public class JobManagerTest {
 
+    private static final EnumSet<JobFlag> none = EnumSet.noneOf(JobFlag.class);
+
     private CCConfig ccConfig;
 
     @Before
@@ -77,7 +81,8 @@ public class JobManagerTest {
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
             acceptedRuns.add(run);
@@ -93,7 +98,8 @@ public class JobManagerTest {
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    
.thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
@@ -109,7 +115,8 @@ public class JobManagerTest {
             JobRun run = mockJobRun(8193);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    
.thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
             jobManager.add(run);
         } catch (HyracksException e) {
@@ -149,7 +156,7 @@ public class JobManagerTest {
             JobRun run = mockJobRun(1);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            when(jobCapacityController.allocate(job))
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
                     
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, 
"1", "0"));
             jobManager.add(run);
         } catch (HyracksException e) {
@@ -172,14 +179,16 @@ public class JobManagerTest {
         JobRun run1 = mockJobRun(1);
         JobSpecification job1 = mock(JobSpecification.class);
         when(run1.getJobSpecification()).thenReturn(job1);
-        
when(jobCapacityController.allocate(job1)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        when(jobCapacityController.allocate(job1, run1.getJobId(), none))
+                
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
         jobManager.add(run1);
 
         // A failure run.
         JobRun run2 = mockJobRun(2);
         JobSpecification job2 = mock(JobSpecification.class);
         when(run2.getJobSpecification()).thenReturn(job2);
-        
when(jobCapacityController.allocate(job2)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+        when(jobCapacityController.allocate(job2, run2.getJobId(), none))
+                .thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                 
.thenThrow(HyracksException.create(ErrorCode.JOB_REQUIREMENTS_EXCEED_CAPACITY, 
"1", "0"));
         jobManager.add(run2);
 
@@ -220,7 +229,8 @@ public class JobManagerTest {
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
             acceptedRuns.add(run);
@@ -236,7 +246,8 @@ public class JobManagerTest {
             JobRun run = mockJobRun(id);
             JobSpecification job = mock(JobSpecification.class);
             when(run.getJobSpecification()).thenReturn(job);
-            
when(jobCapacityController.allocate(job)).thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
+            when(jobCapacityController.allocate(job, run.getJobId(), none))
+                    
.thenReturn(IJobCapacityController.JobSubmissionStatus.QUEUE)
                     
.thenReturn(IJobCapacityController.JobSubmissionStatus.EXECUTE);
 
             // Submits the job.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 7a75a0fcca..97e15337b9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -247,7 +248,8 @@ public abstract class AbstractMultiNCIntegrationTest {
                 private long maxRAM = Runtime.getRuntime().maxMemory();
 
                 @Override
-                public JobSubmissionStatus allocate(JobSpecification job) 
throws HyracksException {
+                public JobSubmissionStatus allocate(JobSpecification job, 
JobId jobId, Set<JobFlag> jobFlags)
+                        throws HyracksException {
                     return maxRAM > 
job.getRequiredClusterCapacity().getAggregatedMemoryByteSize()
                             ? JobSubmissionStatus.EXECUTE : 
JobSubmissionStatus.QUEUE;
                 }

Reply via email to