Steven Jacobs has submitted this change and it was merged. Change subject: Prepare AsterixDB for Pre-Distributed Jobs ......................................................................
Prepare AsterixDB for Pre-Distributed Jobs Change-Id: Id809f4b563bbed808c7764d1af664a15919db35b Reviewed-on: https://asterix-gerrit.ics.uci.edu/1366 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java 10 files changed, 90 insertions(+), 22 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index 7f25896..51ed40d 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -23,9 +23,10 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable; +import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; -public abstract class ActiveSourceOperatorNodePushable extends AbstractOperatorNodePushable implements IActiveRuntime { +public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable + implements IActiveRuntime { protected final IHyracksTaskContext ctx; protected final ActiveManager activeManager; @@ -108,10 +109,6 @@ } } - @Override - public final int getInputArity() { - return 0; - } @Override public final IFrameWriter getInputFrameWriter(int index) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java index ef0af26..780e205 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java @@ -107,17 +107,25 @@ private final byte[] acggfBytes; private final EnumSet<JobFlag> jobFlags; private final DeploymentId deploymentId; + private final JobId jobId; - public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { - this.acggfBytes = acggfBytes; - this.jobFlags = jobFlags; - this.deploymentId = null; - } - - public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { + public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) { this.acggfBytes = acggfBytes; this.jobFlags = jobFlags; this.deploymentId = deploymentId; + this.jobId = jobId; + } + + public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) { + this(null, acggfBytes, jobFlags, jobId); + } + + public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { + this(null, acggfBytes, jobFlags, null); + } + + public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) { + this(deploymentId, acggfBytes, jobFlags, null); } @Override @@ -125,6 +133,10 @@ return FunctionId.START_JOB; } + public JobId getJobId() { + return jobId; + } + public byte[] getACGGFBytes() { return acggfBytes; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java index f8c8512..c049007 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java @@ -69,6 +69,13 @@ } @Override + public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception { + HyracksClientInterfaceFunctions.StartJobFunction sjf = + new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId); + return (JobId) rpci.call(ipcHandle, sjf); + } + + @Override public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception { HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction( deploymentId, acggfBytes, jobFlags); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index 8c0557e..eb92c37 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -102,8 +102,20 @@ return startJob(jsacggf, jobFlags); } + @Override + public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception { + JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = + new JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec); + return startJob(jsacggf, jobFlags, jobId); + } + public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); + } + + public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags, JobId jobId) + throws Exception { + return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, jobId); } public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception { @@ -179,7 +191,8 @@ @Override public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, - EnumSet<JobFlag> jobFlags) throws Exception { + EnumSet<JobFlag> jobFlags) + throws Exception { return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java index fd4d21b..031896e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java @@ -87,6 +87,21 @@ * * @param appName * Name of the application + * @param jobSpec + * Job Specification + * @param jobFlags + * Flags + * @param jobId + * Used to run a pre-distributed job by id (the same value will be returned) + * @throws Exception + */ + public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception; + + /** + * Start the specified Job. + * + * @param appName + * Name of the application * @param acggf * Activity Cluster Graph Generator Factory * @param jobFlags diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java index 3f3c120..39063c6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java @@ -38,6 +38,8 @@ public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception; + public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId) throws Exception; + public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception; public void waitForCompletion(JobId jobId) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java index a33c6c9..1656c51 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java @@ -19,5 +19,6 @@ package org.apache.hyracks.api.job; public enum JobFlag { - PROFILE_RUNTIME + PROFILE_RUNTIME, + STORE_JOB } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index 01c3bf5..26beb63 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -84,9 +84,15 @@ case START_JOB: HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn; - JobId jobId = jobIdFactory.create(); - ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), - sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid))); + JobId jobId = sjf.getJobId(); + byte[] acggfBytes = null; + if (jobId == null) { + jobId = jobIdFactory.create(); + } + //TODO: only send these when the jobId is null + acggfBytes = sjf.getACGGFBytes(); + ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(), + jobId, new IPCResponder<JobId>(handle, mid))); break; case GET_DATASET_DIRECTORY_SERIVICE_INFO: ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs, diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index f00303f..bdb13e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -47,6 +47,7 @@ import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.io.IODeviceHandle; +import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; @@ -114,6 +115,8 @@ private final Map<JobId, Joblet> jobletMap; + private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap; + private ExecutorService executor; private NodeParameters nodeParameters; @@ -167,6 +170,7 @@ lccm = new LifeCycleComponentManager(); workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<>(); + activityClusterGraphMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(NodeControllerService.class.getName()), id)); @@ -356,6 +360,10 @@ return jobletMap; } + public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() { + return activityClusterGraphMap; + } + public NetworkManager getNetworkManager() { return netManager; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index ad9481d..d27caf2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -186,11 +186,18 @@ Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); Joblet ji = jobletMap.get(jobId); if (ji == null) { - if (acgBytes == null) { - throw new HyracksException("Joblet was not found. This job was most likely aborted."); + Map<JobId, ActivityClusterGraph> acgMap = ncs.getActivityClusterGraphMap(); + ActivityClusterGraph acg = acgMap.get(jobId); + if (acg == null) { + if (acgBytes == null) { + throw new HyracksException("Joblet was not found. This job was most likely aborted."); + } + acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx); + if (flags.contains(JobFlag.STORE_JOB)) { + //TODO: Right now the map is append-only + acgMap.put(jobId, acg); + } } - ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, - appCtx); ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg); jobletMap.put(jobId, ji); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1366 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id809f4b563bbed808c7764d1af664a15919db35b Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Steven Jacobs <sjaco...@ucr.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>