Steven Jacobs has submitted this change and it was merged.

Change subject: Prepare AsterixDB for Pre-Distributed Jobs
......................................................................


Prepare AsterixDB for Pre-Distributed Jobs

Change-Id: Id809f4b563bbed808c7764d1af664a15919db35b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1366
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
10 files changed, 90 insertions(+), 22 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; Verified

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 7f25896..51ed40d 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -23,9 +23,10 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
+import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
-public abstract class ActiveSourceOperatorNodePushable extends 
AbstractOperatorNodePushable implements IActiveRuntime {
+public abstract class ActiveSourceOperatorNodePushable extends 
AbstractUnaryOutputSourceOperatorNodePushable
+        implements IActiveRuntime {
 
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
@@ -108,10 +109,6 @@
         }
     }
 
-    @Override
-    public final int getInputArity() {
-        return 0;
-    }
 
     @Override
     public final IFrameWriter getInputFrameWriter(int index) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index ef0af26..780e205 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -107,17 +107,25 @@
         private final byte[] acggfBytes;
         private final EnumSet<JobFlag> jobFlags;
         private final DeploymentId deploymentId;
+        private final JobId jobId;
 
-        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
-            this.acggfBytes = acggfBytes;
-            this.jobFlags = jobFlags;
-            this.deploymentId = null;
-        }
-
-        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) {
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags, JobId jobId) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
             this.deploymentId = deploymentId;
+            this.jobId = jobId;
+        }
+
+        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, 
JobId jobId) {
+            this(null, acggfBytes, jobFlags, jobId);
+        }
+
+        public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+            this(null, acggfBytes, jobFlags, null);
+        }
+
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) {
+            this(deploymentId, acggfBytes, jobFlags, null);
         }
 
         @Override
@@ -125,6 +133,10 @@
             return FunctionId.START_JOB;
         }
 
+        public JobId getJobId() {
+            return jobId;
+        }
+
         public byte[] getACGGFBytes() {
             return acggfBytes;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f8c8512..c049007 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -69,6 +69,13 @@
     }
 
     @Override
+    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId 
jobId) throws Exception {
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new 
HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags, jobId);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, 
EnumSet<JobFlag> jobFlags) throws Exception {
         HyracksClientInterfaceFunctions.StartJobFunction sjf = new 
HyracksClientInterfaceFunctions.StartJobFunction(
                 deploymentId, acggfBytes, jobFlags);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 8c0557e..eb92c37 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -102,8 +102,20 @@
         return startJob(jsacggf, jobFlags);
     }
 
+    @Override
+    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, 
JobId jobId) throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf =
+                new 
JobSpecificationActivityClusterGraphGeneratorFactory(jobSpec);
+        return startJob(jsacggf, jobFlags, jobId);
+    }
+
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+    }
+
+    public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags, JobId jobId)
+            throws Exception {
+        return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags, 
jobId);
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
@@ -179,7 +191,8 @@
 
     @Override
     public JobId startJob(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf,
-            EnumSet<JobFlag> jobFlags) throws Exception {
+            EnumSet<JobFlag> jobFlags)
+            throws Exception {
         return hci.startJob(deploymentId, 
JavaSerializationUtils.serialize(acggf), jobFlags);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index fd4d21b..031896e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -87,6 +87,21 @@
      *
      * @param appName
      *            Name of the application
+     * @param jobSpec
+     *            Job Specification
+     * @param jobFlags
+     *            Flags
+     * @param jobId
+     *            Used to run a pre-distributed job by id (the same value will 
be returned)
+     * @throws Exception
+     */
+    public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, 
JobId jobId) throws Exception;
+
+    /**
+     * Start the specified Job.
+     *
+     * @param appName
+     *            Name of the application
      * @param acggf
      *            Activity Cluster Graph Generator Factory
      * @param jobFlags
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 3f3c120..39063c6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -38,6 +38,8 @@
 
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws 
Exception;
 
+    public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId 
jobId) throws Exception;
+
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
 
     public void waitForCompletion(JobId jobId) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
index a33c6c9..1656c51 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobFlag.java
@@ -19,5 +19,6 @@
 package org.apache.hyracks.api.job;
 
 public enum JobFlag {
-    PROFILE_RUNTIME
+    PROFILE_RUNTIME,
+    STORE_JOB
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 01c3bf5..26beb63 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -84,9 +84,15 @@
             case START_JOB:
                 HyracksClientInterfaceFunctions.StartJobFunction sjf =
                         (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                JobId jobId = jobIdFactory.create();
-                ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(),
-                        sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new 
IPCResponder<JobId>(handle, mid)));
+                JobId jobId = sjf.getJobId();
+                byte[] acggfBytes = null;
+                if (jobId == null) {
+                    jobId = jobIdFactory.create();
+                }
+                //TODO: only send these when the jobId is null
+                acggfBytes = sjf.getACGGFBytes();
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, 
sjf.getDeploymentId(), acggfBytes, sjf.getJobFlags(),
+                        jobId, new IPCResponder<JobId>(handle, mid)));
                 break;
             case GET_DATASET_DIRECTORY_SERIVICE_INFO:
                 ccs.getWorkQueue().schedule(new 
GetDatasetDirectoryServiceInfoWork(ccs,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index f00303f..bdb13e4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
@@ -114,6 +115,8 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
+    private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+
     private ExecutorService executor;
 
     private NodeParameters nodeParameters;
@@ -167,6 +170,7 @@
         lccm = new LifeCycleComponentManager();
         workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves 
MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
+        activityClusterGraphMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -356,6 +360,10 @@
         return jobletMap;
     }
 
+    public Map<JobId, ActivityClusterGraph> getActivityClusterGraphMap() {
+        return activityClusterGraphMap;
+    }
+
     public NetworkManager getNetworkManager() {
         return netManager;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index ad9481d..d27caf2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -186,11 +186,18 @@
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            if (acgBytes == null) {
-                throw new HyracksException("Joblet was not found. This job was 
most likely aborted.");
+            Map<JobId, ActivityClusterGraph> acgMap = 
ncs.getActivityClusterGraphMap();
+            ActivityClusterGraph acg = acgMap.get(jobId);
+            if (acg == null) {
+                if (acgBytes == null) {
+                    throw new HyracksException("Joblet was not found. This job 
was most likely aborted.");
+                }
+                acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx);
+                if (flags.contains(JobFlag.STORE_JOB)) {
+                    //TODO: Right now the map is append-only
+                    acgMap.put(jobId, acg);
+                }
             }
-            ActivityClusterGraph acg = (ActivityClusterGraph) 
DeploymentUtils.deserialize(acgBytes, deploymentId,
-                    appCtx);
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
         }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1366
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id809f4b563bbed808c7764d1af664a15919db35b
Gerrit-PatchSet: 6
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sjaco...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to