http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java deleted file mode 100644 index b577ff7..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java +++ /dev/null @@ -1,745 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.scheduler; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Random; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.hyracks.api.comm.NetworkAddress; -import org.apache.hyracks.api.constraints.Constraint; -import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; -import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression; -import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; -import org.apache.hyracks.api.dataflow.IConnectorDescriptor; -import org.apache.hyracks.api.dataflow.OperatorDescriptorId; -import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.dataflow.TaskId; -import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy; -import org.apache.hyracks.api.deployment.DeploymentId; -import org.apache.hyracks.api.exceptions.HyracksException; -import org.apache.hyracks.api.job.ActivityCluster; -import org.apache.hyracks.api.job.ActivityClusterGraph; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.job.JobStatus; -import org.apache.hyracks.api.partitions.PartitionId; -import org.apache.hyracks.api.util.JavaSerializationUtils; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.NodeControllerState; -import org.apache.hyracks.control.cc.application.CCApplicationContext; -import org.apache.hyracks.control.cc.job.ActivityClusterPlan; -import org.apache.hyracks.control.cc.job.JobRun; -import org.apache.hyracks.control.cc.job.Task; -import org.apache.hyracks.control.cc.job.TaskAttempt; -import org.apache.hyracks.control.cc.job.TaskCluster; -import org.apache.hyracks.control.cc.job.TaskClusterAttempt; -import org.apache.hyracks.control.cc.partitions.PartitionMatchMaker; -import org.apache.hyracks.control.cc.work.JobCleanupWork; -import org.apache.hyracks.control.common.job.PartitionState; -import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; - -public class JobScheduler { - private static final Logger LOGGER = Logger.getLogger(JobScheduler.class.getName()); - - private final ClusterControllerService ccs; - - private final JobRun jobRun; - - private final PartitionConstraintSolver solver; - - private final Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap; - - private final Set<TaskCluster> inProgressTaskClusters; - - - public JobScheduler(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) { - this.ccs = ccs; - this.jobRun = jobRun; - solver = new PartitionConstraintSolver(); - partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>(); - inProgressTaskClusters = new HashSet<TaskCluster>(); - solver.addConstraints(constraints); - } - - public JobRun getJobRun() { - return jobRun; - } - - public PartitionConstraintSolver getSolver() { - return solver; - } - - public void startJob() throws HyracksException { - startRunnableActivityClusters(); - ccs.getApplicationContext().notifyJobStart(jobRun.getJobId()); - } - - private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots) - throws HyracksException { - for (ActivityCluster root : roots) { - findRunnableTaskClusterRoots(frontier, root); - } - } - - private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, ActivityCluster candidate) - throws HyracksException { - boolean depsComplete = true; - for (ActivityCluster depAC : candidate.getDependencies()) { - if (!isPlanned(depAC)) { - depsComplete = false; - findRunnableTaskClusterRoots(frontier, depAC); - } else { - boolean tcRootsComplete = true; - for (TaskCluster tc : getActivityClusterPlan(depAC).getTaskClusters()) { - if (tc.getProducedPartitions().isEmpty()) { - TaskClusterAttempt tca = findLastTaskClusterAttempt(tc); - if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) { - tcRootsComplete = false; - break; - } - } - } - if (!tcRootsComplete) { - depsComplete = false; - findRunnableTaskClusterRoots(frontier, depAC); - } - } - } - if (depsComplete) { - if (!isPlanned(candidate)) { - ActivityClusterPlanner acp = new ActivityClusterPlanner(this); - ActivityClusterPlan acPlan = acp.planActivityCluster(candidate); - jobRun.getActivityClusterPlanMap().put(candidate.getId(), acPlan); - partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap()); - } - for (TaskCluster tc : getActivityClusterPlan(candidate).getTaskClusters()) { - if (tc.getProducedPartitions().isEmpty()) { - TaskClusterAttempt tca = findLastTaskClusterAttempt(tc); - if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) { - frontier.add(tc); - } - } - } - } - } - - private ActivityClusterPlan getActivityClusterPlan(ActivityCluster ac) { - return jobRun.getActivityClusterPlanMap().get(ac.getId()); - } - - private boolean isPlanned(ActivityCluster ac) { - return jobRun.getActivityClusterPlanMap().get(ac.getId()) != null; - } - - private void startRunnableActivityClusters() throws HyracksException { - Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>(); - findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap() - .values()); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " - + inProgressTaskClusters); - } - if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) { - ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null)); - return; - } - startRunnableTaskClusters(taskClusterRoots); - } - - private void startRunnableTaskClusters(Set<TaskCluster> tcRoots) throws HyracksException { - Map<TaskCluster, Runnability> runnabilityMap = new HashMap<TaskCluster, Runnability>(); - for (TaskCluster tc : tcRoots) { - assignRunnabilityRank(tc, runnabilityMap); - } - - PriorityQueue<RankedRunnableTaskCluster> queue = new PriorityQueue<RankedRunnableTaskCluster>(); - for (Map.Entry<TaskCluster, Runnability> e : runnabilityMap.entrySet()) { - TaskCluster tc = e.getKey(); - Runnability runnability = e.getValue(); - if (runnability.getTag() != Runnability.Tag.RUNNABLE) { - continue; - } - int priority = runnability.getPriority(); - if (priority >= 0 && priority < Integer.MAX_VALUE) { - queue.add(new RankedRunnableTaskCluster(priority, tc)); - } - } - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Ranked TCs: " + queue); - } - - Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>(); - for (RankedRunnableTaskCluster rrtc : queue) { - TaskCluster tc = rrtc.getTaskCluster(); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Found runnable TC: " + tc); - List<TaskClusterAttempt> attempts = tc.getAttempts(); - LOGGER.fine("Attempts so far:" + attempts.size()); - for (TaskClusterAttempt tcAttempt : attempts) { - LOGGER.fine("Status: " + tcAttempt.getStatus()); - } - } - assignTaskLocations(tc, taskAttemptMap); - } - - if (taskAttemptMap.isEmpty()) { - return; - } - - startTasks(taskAttemptMap); - } - - /* - * Runnability rank has the following semantics - * Runnability(Runnable TaskCluster depending on completed TaskClusters) = {RUNNABLE, 0} - * Runnability(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1 - * Runnability(Non-schedulable TaskCluster) = {NOT_RUNNABLE, _} - */ - private Runnability assignRunnabilityRank(TaskCluster goal, Map<TaskCluster, Runnability> runnabilityMap) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Computing runnability: " + goal); - } - if (runnabilityMap.containsKey(goal)) { - return runnabilityMap.get(goal); - } - TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(goal); - if (lastAttempt != null) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Last Attempt Status: " + lastAttempt.getStatus()); - } - if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED) { - Runnability runnability = new Runnability(Runnability.Tag.COMPLETED, Integer.MIN_VALUE); - runnabilityMap.put(goal, runnability); - return runnability; - } - if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) { - Runnability runnability = new Runnability(Runnability.Tag.RUNNING, Integer.MIN_VALUE); - runnabilityMap.put(goal, runnability); - return runnability; - } - } - Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap(); - PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker(); - Runnability aggregateRunnability = new Runnability(Runnability.Tag.RUNNABLE, 0); - for (PartitionId pid : goal.getRequiredPartitions()) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Inspecting required partition: " + pid); - } - Runnability runnability; - ConnectorDescriptorId cdId = pid.getConnectorDescriptorId(); - IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId); - PartitionState maxState = pmm.getMaximumAvailableState(pid); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Policy: " + cPolicy + " maxState: " + maxState); - } - if (PartitionState.COMMITTED.equals(maxState)) { - runnability = new Runnability(Runnability.Tag.RUNNABLE, 0); - } else if (PartitionState.STARTED.equals(maxState) && !cPolicy.consumerWaitsForProducerToFinish()) { - runnability = new Runnability(Runnability.Tag.RUNNABLE, 1); - } else { - runnability = assignRunnabilityRank(partitionProducingTaskClusterMap.get(pid), runnabilityMap); - switch (runnability.getTag()) { - case RUNNABLE: - if (cPolicy.consumerWaitsForProducerToFinish()) { - runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE); - } else { - runnability = new Runnability(Runnability.Tag.RUNNABLE, runnability.getPriority() + 1); - } - break; - - case NOT_RUNNABLE: - break; - - case RUNNING: - if (cPolicy.consumerWaitsForProducerToFinish()) { - runnability = new Runnability(Runnability.Tag.NOT_RUNNABLE, Integer.MAX_VALUE); - } else { - runnability = new Runnability(Runnability.Tag.RUNNABLE, 1); - } - break; - } - } - aggregateRunnability = Runnability.getWorstCase(aggregateRunnability, runnability); - if (aggregateRunnability.getTag() == Runnability.Tag.NOT_RUNNABLE) { - // already not runnable -- cannot get better. bail. - break; - } - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("aggregateRunnability: " + aggregateRunnability); - } - } - runnabilityMap.put(goal, aggregateRunnability); - return aggregateRunnability; - } - - private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) - throws HyracksException { - ActivityClusterGraph acg = jobRun.getActivityClusterGraph(); - Task[] tasks = tc.getTasks(); - List<TaskClusterAttempt> tcAttempts = tc.getAttempts(); - int attempts = tcAttempts.size(); - TaskClusterAttempt tcAttempt = new TaskClusterAttempt(tc, attempts); - Map<TaskId, TaskAttempt> taskAttempts = new HashMap<TaskId, TaskAttempt>(); - Map<TaskId, LValueConstraintExpression> locationMap = new HashMap<TaskId, LValueConstraintExpression>(); - for (int i = 0; i < tasks.length; ++i) { - Task ts = tasks[i]; - TaskId tid = ts.getTaskId(); - TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(), - tid.getPartition()), attempts), ts); - taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null); - locationMap.put(tid, - new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition())); - taskAttempts.put(tid, taskAttempt); - } - tcAttempt.setTaskAttempts(taskAttempts); - solver.solve(locationMap.values()); - for (int i = 0; i < tasks.length; ++i) { - Task ts = tasks[i]; - TaskId tid = ts.getTaskId(); - TaskAttempt taskAttempt = taskAttempts.get(tid); - String nodeId = assignLocation(acg, locationMap, tid, taskAttempt); - taskAttempt.setNodeId(nodeId); - taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null); - taskAttempt.setStartTime(System.currentTimeMillis()); - List<TaskAttemptDescriptor> tads = taskAttemptMap.get(nodeId); - if (tads == null) { - tads = new ArrayList<TaskAttemptDescriptor>(); - taskAttemptMap.put(nodeId, tads); - } - OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId(); - jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId); - ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails(); - TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), - apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts()); - tads.add(tad); - } - tcAttempt.initializePendingTaskCounter(); - tcAttempts.add(tcAttempt); - - /** - * Improvement for reducing master/slave message communications, for each TaskAttemptDescriptor, - * we set the NetworkAddress[][] partitionLocations, in which each row is for an incoming connector descriptor - * and each column is for an input channel of the connector. - */ - for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) { - List<TaskAttemptDescriptor> tads = e.getValue(); - for (TaskAttemptDescriptor tad : tads) { - TaskAttemptId taid = tad.getTaskAttemptId(); - int attempt = taid.getAttempt(); - TaskId tid = taid.getTaskId(); - ActivityId aid = tid.getActivityId(); - List<IConnectorDescriptor> inConnectors = acg.getActivityInputs(aid); - int[] inPartitionCounts = tad.getInputPartitionCounts(); - if (inPartitionCounts != null) { - NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][]; - for (int i = 0; i < inPartitionCounts.length; ++i) { - ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId(); - IConnectorPolicy policy = jobRun.getConnectorPolicyMap().get(cdId); - /** - * carry sender location information into a task - * when it is not the case that it is an re-attempt and the send-side - * is materialized blocking. - */ - if (!(attempt > 0 && policy.materializeOnSendSide() && policy - .consumerWaitsForProducerToFinish())) { - ActivityId producerAid = acg.getProducerActivity(cdId); - partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]]; - for (int j = 0; j < inPartitionCounts[i]; ++j) { - TaskId producerTaskId = new TaskId(producerAid, j); - String nodeId = findTaskLocation(producerTaskId); - partitionLocations[i][j] = ccs.getNodeMap().get(nodeId).getDataPort(); - } - } - } - tad.setInputPartitionLocations(partitionLocations); - } - } - } - - tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING); - tcAttempt.setStartTime(System.currentTimeMillis()); - inProgressTaskClusters.add(tc); - } - - private String assignLocation(ActivityClusterGraph acg, Map<TaskId, LValueConstraintExpression> locationMap, - TaskId tid, TaskAttempt taskAttempt) throws HyracksException { - ActivityId aid = tid.getActivityId(); - ActivityCluster ac = acg.getActivityMap().get(aid); - Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(aid); - String nodeId = null; - if (blockers != null) { - for (ActivityId blocker : blockers) { - nodeId = findTaskLocation(new TaskId(blocker, tid.getPartition())); - if (nodeId != null) { - break; - } - } - } - Set<String> liveNodes = ccs.getNodeMap().keySet(); - if (nodeId == null) { - LValueConstraintExpression pLocationExpr = locationMap.get(tid); - Object location = solver.getValue(pLocationExpr); - if (location == null) { - // pick any - nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt()) - % liveNodes.size()]; - } else if (location instanceof String) { - nodeId = (String) location; - } else if (location instanceof String[]) { - for (String choice : (String[]) location) { - if (liveNodes.contains(choice)) { - nodeId = choice; - break; - } - } - if (nodeId == null) { - throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId()); - } - } else { - throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "(" - + location.getClass() + ")"); - } - } - if (nodeId == null) { - throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId()); - } - if (!liveNodes.contains(nodeId)) { - throw new HyracksException("Node " + nodeId + " not live"); - } - return nodeId; - } - - private String findTaskLocation(TaskId tid) { - ActivityId aid = tid.getActivityId(); - ActivityCluster ac = jobRun.getActivityClusterGraph().getActivityMap().get(aid); - Task[] tasks = getActivityClusterPlan(ac).getActivityPlanMap().get(aid).getTasks(); - List<TaskClusterAttempt> tcAttempts = tasks[tid.getPartition()].getTaskCluster().getAttempts(); - if (tcAttempts == null || tcAttempts.isEmpty()) { - return null; - } - TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1); - TaskAttempt ta = lastTCA.getTaskAttempts().get(tid); - return ta == null ? null : ta.getNodeId(); - } - - private static TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) { - List<TaskClusterAttempt> attempts = tc.getAttempts(); - if (!attempts.isEmpty()) { - return attempts.get(attempts.size() - 1); - } - return null; - } - - private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException { - final DeploymentId deploymentId = jobRun.getDeploymentId(); - final JobId jobId = jobRun.getJobId(); - final ActivityClusterGraph acg = jobRun.getActivityClusterGraph(); - final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>( - jobRun.getConnectorPolicyMap()); - try { - byte[] acgBytes = JavaSerializationUtils.serialize(acg); - for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) { - String nodeId = entry.getKey(); - final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue(); - final NodeControllerState node = ccs.getNodeMap().get(nodeId); - if (node != null) { - node.getActiveJobIds().add(jobRun.getJobId()); - boolean changed = jobRun.getParticipatingNodeIds().add(nodeId); - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey()); - } - byte[] jagBytes = changed ? acgBytes : null; - node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors, - connectorPolicies, jobRun.getFlags()); - } - } - } catch (Exception e) { - throw new HyracksException(e); - } - } - - private void abortJob(List<Exception> exceptions) { - Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<TaskCluster>(inProgressTaskClusters); - for (TaskCluster tc : inProgressTaskClustersCopy) { - abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED); - } - assert inProgressTaskClusters.isEmpty(); - ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exceptions)); - } - - private void abortTaskCluster(TaskClusterAttempt tcAttempt, - TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) { - LOGGER.fine("Aborting task cluster: " + tcAttempt.getAttempt()); - Set<TaskAttemptId> abortTaskIds = new HashSet<TaskAttemptId>(); - Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>(); - for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) { - TaskAttemptId taId = ta.getTaskAttemptId(); - TaskAttempt.TaskStatus status = ta.getStatus(); - abortTaskIds.add(taId); - LOGGER.fine("Checking " + taId + ": " + ta.getStatus()); - if (status == TaskAttempt.TaskStatus.RUNNING || status == TaskAttempt.TaskStatus.COMPLETED) { - ta.setStatus(TaskAttempt.TaskStatus.ABORTED, null); - ta.setEndTime(System.currentTimeMillis()); - List<TaskAttemptId> abortTaskAttempts = abortTaskAttemptMap.get(ta.getNodeId()); - if (status == TaskAttempt.TaskStatus.RUNNING && abortTaskAttempts == null) { - abortTaskAttempts = new ArrayList<TaskAttemptId>(); - abortTaskAttemptMap.put(ta.getNodeId(), abortTaskAttempts); - } - if (status == TaskAttempt.TaskStatus.RUNNING) { - abortTaskAttempts.add(taId); - } - } - } - final JobId jobId = jobRun.getJobId(); - LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap); - for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) { - final NodeControllerState node = ccs.getNodeMap().get(entry.getKey()); - final List<TaskAttemptId> abortTaskAttempts = entry.getValue(); - if (node != null) { - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey()); - } - try { - node.getNodeController().abortTasks(jobId, abortTaskAttempts); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - inProgressTaskClusters.remove(tcAttempt.getTaskCluster()); - TaskCluster tc = tcAttempt.getTaskCluster(); - PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker(); - pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds); - pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds); - - tcAttempt.setStatus(failedOrAbortedStatus); - tcAttempt.setEndTime(System.currentTimeMillis()); - } - - private void abortDoomedTaskClusters() throws HyracksException { - Set<TaskCluster> doomedTaskClusters = new HashSet<TaskCluster>(); - for (TaskCluster tc : inProgressTaskClusters) { - // Start search at TCs that produce no outputs (sinks) - if (tc.getProducedPartitions().isEmpty()) { - findDoomedTaskClusters(tc, doomedTaskClusters); - } - } - - for (TaskCluster tc : doomedTaskClusters) { - TaskClusterAttempt tca = findLastTaskClusterAttempt(tc); - if (tca != null) { - abortTaskCluster(tca, TaskClusterAttempt.TaskClusterStatus.ABORTED); - } - } - } - - private boolean findDoomedTaskClusters(TaskCluster tc, Set<TaskCluster> doomedTaskClusters) { - if (doomedTaskClusters.contains(tc)) { - return true; - } - TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); - if (lastAttempt != null) { - switch (lastAttempt.getStatus()) { - case ABORTED: - case FAILED: - return true; - - case COMPLETED: - return false; - } - } - Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap = jobRun.getConnectorPolicyMap(); - PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker(); - boolean doomed = false; - for (TaskCluster depTC : tc.getDependencyTaskClusters()) { - if (findDoomedTaskClusters(depTC, doomedTaskClusters)) { - doomed = true; - } - } - for (PartitionId pid : tc.getRequiredPartitions()) { - ConnectorDescriptorId cdId = pid.getConnectorDescriptorId(); - IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId); - PartitionState maxState = pmm.getMaximumAvailableState(pid); - if (maxState == null - || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED)) { - if (findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) { - doomed = true; - } - } - } - if (doomed) { - doomedTaskClusters.add(tc); - } - return doomed; - } - - public void notifyTaskComplete(TaskAttempt ta) throws HyracksException { - TaskAttemptId taId = ta.getTaskAttemptId(); - TaskCluster tc = ta.getTask().getTaskCluster(); - TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); - if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) { - TaskAttempt.TaskStatus taStatus = ta.getStatus(); - if (taStatus == TaskAttempt.TaskStatus.RUNNING) { - ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null); - ta.setEndTime(System.currentTimeMillis()); - if (lastAttempt.decrementPendingTasksCounter() == 0) { - lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED); - lastAttempt.setEndTime(System.currentTimeMillis()); - inProgressTaskClusters.remove(tc); - startRunnableActivityClusters(); - } - } else { - LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus); - } - } else { - LOGGER.warning("Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt); - } - } - - /** - * Indicates that a single task attempt has encountered a failure. - * @param ta Failed Task Attempt - * @param exceptions exeptions thrown during the failure - */ - public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) { - try { - LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId()); - TaskAttemptId taId = ta.getTaskAttemptId(); - TaskCluster tc = ta.getTask().getTaskCluster(); - TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); - if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) { - LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed"); - ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions); - abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED); - abortDoomedTaskClusters(); - if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) { - abortJob(exceptions); - return; - } - startRunnableActivityClusters(); - } else { - LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = " - + lastAttempt); - } - } catch (Exception e) { - abortJob(Collections.singletonList(e)); - } - } - - /** - * Indicates that the provided set of nodes have left the cluster. - * - * @param deadNodes - * - Set of failed nodes - */ - public void notifyNodeFailures(Set<String> deadNodes) { - try { - jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes); - jobRun.getParticipatingNodeIds().removeAll(deadNodes); - jobRun.getCleanupPendingNodeIds().removeAll(deadNodes); - if (jobRun.getPendingStatus() != null && jobRun.getCleanupPendingNodeIds().isEmpty()) { - finishJob(jobRun); - return; - } - for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) { - if (isPlanned(ac)) { - TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters(); - if (taskClusters != null) { - for (TaskCluster tc : taskClusters) { - TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc); - if (lastTaskClusterAttempt != null - && (lastTaskClusterAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt - .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) { - boolean abort = false; - for (TaskAttempt ta : lastTaskClusterAttempt.getTaskAttempts().values()) { - assert (ta.getStatus() == TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() == TaskAttempt.TaskStatus.RUNNING); - if (deadNodes.contains(ta.getNodeId())) { - ta.setStatus( - TaskAttempt.TaskStatus.FAILED, - Collections.singletonList(new Exception("Node " + ta.getNodeId() - + " failed"))); - ta.setEndTime(System.currentTimeMillis()); - abort = true; - } - } - if (abort) { - abortTaskCluster(lastTaskClusterAttempt, - TaskClusterAttempt.TaskClusterStatus.ABORTED); - } - } - } - abortDoomedTaskClusters(); - } - } - } - startRunnableActivityClusters(); - } catch (Exception e) { - abortJob(Collections.singletonList(e)); - } - } - - private void finishJob(final JobRun run) { - JobId jobId = run.getJobId(); - CCApplicationContext appCtx = ccs.getApplicationContext(); - if (appCtx != null) { - try { - appCtx.notifyJobFinish(jobId); - } catch (HyracksException e) { - e.printStackTrace(); - } - } - run.setStatus(run.getPendingStatus(), run.getPendingExceptions()); - run.setEndTime(System.currentTimeMillis()); - ccs.getActiveRunMap().remove(jobId); - ccs.getRunMapArchive().put(jobId, run); - ccs.getRunHistory().put(jobId, run.getExceptions()); - - if (run.getActivityClusterGraph().isReportTaskDetails()) { - /** - * log job details when task-profiling is enabled - */ - try { - ccs.getJobLogFile().log(createJobLogObject(run)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - - private ObjectNode createJobLogObject(final JobRun run) { - ObjectMapper om = new ObjectMapper(); - ObjectNode jobLogObject = om.createObjectNode(); - ActivityClusterGraph acg = run.getActivityClusterGraph(); - jobLogObject.set("activity-cluster-graph", acg.toJSON()); - jobLogObject.set("job-run", run.toJSON()); - return jobLogObject; - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java deleted file mode 100644 index 6a41d01..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/PartitionConstraintSolver.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.scheduler; - -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hyracks.api.constraints.Constraint; -import org.apache.hyracks.api.constraints.expressions.ConstantExpression; -import org.apache.hyracks.api.constraints.expressions.ConstraintExpression; -import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; - -public class PartitionConstraintSolver { - private final Map<LValueConstraintExpression, Set<ConstraintExpression>> constraints; - - public PartitionConstraintSolver() { - constraints = new HashMap<LValueConstraintExpression, Set<ConstraintExpression>>(); - } - - public void addConstraints(Collection<Constraint> constraintCollection) { - for (Constraint c : constraintCollection) { - addConstraint(c); - } - } - - public void addConstraint(Constraint c) { - Set<ConstraintExpression> rValues = constraints.get(c.getLValue()); - if (rValues == null) { - rValues = new HashSet<ConstraintExpression>(); - constraints.put(c.getLValue(), rValues); - } - rValues.add(c.getRValue()); - } - - public void solve(Collection<LValueConstraintExpression> targetSet) { - Set<LValueConstraintExpression> inProcess = new HashSet<LValueConstraintExpression>(); - for (LValueConstraintExpression lv : targetSet) { - solveLValue(lv, inProcess); - } - } - - private Solution solve(ConstraintExpression ce, Set<LValueConstraintExpression> inProcess) { - switch (ce.getTag()) { - case CONSTANT: - return new Solution(((ConstantExpression) ce).getValue(), Solution.Status.FOUND); - - case PARTITION_COUNT: - case PARTITION_LOCATION: - return solveLValue((LValueConstraintExpression) ce, inProcess); - } - return null; - } - - private Solution solveLValue(LValueConstraintExpression lv, Set<LValueConstraintExpression> inProcess) { - if (inProcess.contains(lv)) { - return new Solution(null, Solution.Status.CYCLE); - } - Solution result = null; - inProcess.add(lv); - Set<ConstraintExpression> rValues = constraints.get(lv); - if (rValues == null) { - return new Solution(null, Solution.Status.NOT_BOUND); - } - for (ConstraintExpression ce : rValues) { - Solution solution = solve(ce, inProcess); - if (solution != null && solution.status == Solution.Status.FOUND) { - result = solution; - break; - } - } - if (result != null) { - rValues.clear(); - rValues.add(new ConstantExpression(result.value)); - } - inProcess.remove(lv); - return result; - } - - public Object getValue(LValueConstraintExpression lValue) { - Set<ConstraintExpression> rValues = constraints.get(lValue); - if (rValues == null) { - return null; - } - if (rValues.size() != 1) { - return null; - } - for (ConstraintExpression ce : rValues) { - if (ce.getTag() == ConstraintExpression.ExpressionTag.CONSTANT) { - return ((ConstantExpression) ce).getValue(); - } - } - return null; - } - - private static class Solution { - enum Status { - FOUND, - CYCLE, - NOT_BOUND, - } - - final Object value; - final Status status; - - public Solution(Object value, Status status) { - this.value = value; - this.status = status; - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java deleted file mode 100644 index a79bf50..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.scheduler; - -import org.apache.hyracks.control.cc.job.TaskCluster; - -public class RankedRunnableTaskCluster implements Comparable<RankedRunnableTaskCluster> { - private final int rank; - - private final TaskCluster taskCluster; - - public RankedRunnableTaskCluster(int rank, TaskCluster taskCluster) { - this.rank = rank; - this.taskCluster = taskCluster; - } - - public int getRank() { - return rank; - } - - public TaskCluster getTaskCluster() { - return taskCluster; - } - - @Override - public String toString() { - return "[" + rank + ":" + taskCluster + "]"; - } - - @Override - public int compareTo(RankedRunnableTaskCluster o) { - int cmp = rank - o.rank; - return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java new file mode 100644 index 0000000..6168dce --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/ResourceManager.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.control.cc.scheduler; + +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.resource.ClusterCapacity; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; +import org.apache.hyracks.api.job.resource.NodeCapacity; + +public class ResourceManager implements IResourceManager { + + // The maximum capacity, assuming that there is no running job that occupies capacity. + // It is unchanged unless any node is added, removed or updated. + private IClusterCapacity maxCapacity = new ClusterCapacity(); + + // The current capacity, which is dynamically changing. + private IClusterCapacity currentCapacity = new ClusterCapacity(); + + @Override + public IReadOnlyClusterCapacity getMaximumCapacity() { + return maxCapacity; + } + + @Override + public IClusterCapacity getCurrentCapacity() { + return currentCapacity; + } + + @Override + public void update(String nodeId, NodeCapacity capacity) throws HyracksException { + maxCapacity.update(nodeId, capacity); + currentCapacity.update(nodeId, capacity); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java deleted file mode 100644 index 70d3f16..0000000 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/Runnability.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.control.cc.scheduler; - -public final class Runnability { - private final Tag tag; - - private final int priority; - - public Runnability(Tag tag, int priority) { - this.tag = tag; - this.priority = priority; - } - - public Tag getTag() { - return tag; - } - - public int getPriority() { - return priority; - } - - public enum Tag { - COMPLETED, - NOT_RUNNABLE, - RUNNABLE, - RUNNING, - } - - public static Runnability getWorstCase(Runnability r1, Runnability r2) { - switch (r1.tag) { - case COMPLETED: - switch (r2.tag) { - case COMPLETED: - case NOT_RUNNABLE: - case RUNNABLE: - case RUNNING: - return r2; - } - break; - - case NOT_RUNNABLE: - switch (r2.tag) { - case COMPLETED: - case NOT_RUNNABLE: - case RUNNABLE: - case RUNNING: - return r1; - } - break; - - case RUNNABLE: - switch (r2.tag) { - case COMPLETED: - return r1; - - case RUNNING: - return r1.priority > 0 ? r1 : new Runnability(Tag.RUNNABLE, 1); - - case NOT_RUNNABLE: - return r2; - - case RUNNABLE: - return r1.priority > r2.priority ? r1 : r2; - } - break; - - case RUNNING: - switch (r2.tag) { - case COMPLETED: - case RUNNING: - return r1; - - case NOT_RUNNABLE: - return r2; - - case RUNNABLE: - return r2.priority > 0 ? r2 : new Runnability(Tag.RUNNABLE, 1); - } - break; - } - throw new IllegalArgumentException("Could not aggregate: " + r1 + " and " + r2); - } - - @Override - public String toString() { - return "{" + tag + ", " + priority + "}"; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java index e69884a..892807b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/JobsRESTAPIFunction.java @@ -45,7 +45,7 @@ public class JobsRESTAPIFunction implements IJSONOutputFunction { break; } case 0: { - GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs); + GetJobSummariesJSONWork gjse = new GetJobSummariesJSONWork(ccs.getJobManager()); ccs.getWorkQueue().scheduleAndSync(gjse); result.set("result", gjse.getSummaries()); break; @@ -59,7 +59,7 @@ public class JobsRESTAPIFunction implements IJSONOutputFunction { ccs.getWorkQueue().scheduleAndSync(gjage); result.set("result", gjage.getJSON()); } else if ("job-run".equalsIgnoreCase(arguments[1])) { - GetJobRunJSONWork gjre = new GetJobRunJSONWork(ccs, jobId); + GetJobRunJSONWork gjre = new GetJobRunJSONWork(ccs.getJobManager(), jobId); ccs.getWorkQueue().scheduleAndSync(gjre); result.set("result", gjre.getJSON()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java index 8994895..3b4918c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/NodesRESTAPIFunction.java @@ -18,14 +18,14 @@ */ package org.apache.hyracks.control.cc.web; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.web.util.IJSONOutputFunction; import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork; import org.apache.hyracks.control.cc.work.GetNodeSummariesJSONWork; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class NodesRESTAPIFunction implements IJSONOutputFunction { private ClusterControllerService ccs; @@ -40,12 +40,13 @@ public class NodesRESTAPIFunction implements IJSONOutputFunction { switch (arguments.length) { case 1: { if ("".equals(arguments[0])) { - GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs); + GetNodeSummariesJSONWork gnse = new GetNodeSummariesJSONWork(ccs.getNodeManager()); ccs.getWorkQueue().scheduleAndSync(gnse); result.set("result", gnse.getSummaries()); } else { String nodeId = arguments[0]; - GetNodeDetailsJSONWork gnde = new GetNodeDetailsJSONWork(ccs, nodeId, true, true); + GetNodeDetailsJSONWork gnde = new GetNodeDetailsJSONWork(ccs.getNodeManager(), ccs.getCCConfig(), + nodeId, true, true); ccs.getWorkQueue().scheduleAndSync(gnde); result.set("result", gnde.getDetail()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java index b55e65d..8fe68c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java @@ -19,7 +19,6 @@ package org.apache.hyracks.control.cc.web; import java.util.EnumSet; -import java.util.logging.Logger; import javax.servlet.DispatcherType; @@ -43,8 +42,6 @@ import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; public class WebServer { - private final static Logger LOGGER = Logger.getLogger(WebServer.class.getName()); - private final ClusterControllerService ccs; private final Server server; private final ServerConnector connector; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java index ad333fe..fa5dcd0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java @@ -19,10 +19,9 @@ package org.apache.hyracks.control.cc.work; -import java.util.Map; - import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.heartbeat.HeartbeatData; import org.apache.hyracks.control.common.work.SynchronizableWork; @@ -40,8 +39,8 @@ public abstract class AbstractHeartbeatWork extends SynchronizableWork { @Override public void doRun() { - Map<String, NodeControllerState> nodeMap = ccs.getNodeMap(); - NodeControllerState state = nodeMap.get(nodeId); + INodeManager nodeManager = ccs.getNodeManager(); + NodeControllerState state = nodeManager.getNodeControllerState(nodeId); if (state != null) { state.notifyHeartbeat(hbData); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java index 9134a91..3babf00 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java @@ -28,6 +28,7 @@ import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.job.ActivityPlan; +import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.job.Task; import org.apache.hyracks.control.cc.job.TaskAttempt; @@ -50,7 +51,8 @@ public abstract class AbstractTaskLifecycleWork extends AbstractHeartbeatWork { @Override public final void runWork() { - JobRun run = ccs.getActiveRunMap().get(jobId); + IJobManager jobManager = ccs.getJobManager(); + JobRun run = jobManager.get(jobId); if (run != null) { TaskId tid = taId.getTaskId(); Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterGraph().getActivityMap(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java index 741c641..6480674 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java @@ -20,15 +20,14 @@ package org.apache.hyracks.control.cc.work; import java.net.URL; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; import java.util.UUID; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.deployment.DeploymentRun; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.work.IPCResponder; @@ -72,12 +71,8 @@ public class CliDeployBinaryWork extends SynchronizableWork { /** * Deploy for the node controllers */ - Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap(); - - Set<String> nodeIds = new TreeSet<String>(); - for (String nc : nodeControllerStateMap.keySet()) { - nodeIds.add(nc); - } + INodeManager nodeManager = ccs.getNodeManager(); + Collection<String> nodeIds = nodeManager.getAllNodeIds(); final DeploymentRun dRun = new DeploymentRun(nodeIds); /** The following call prevents a user to deploy with the same deployment id simultaneously. */ @@ -86,7 +81,7 @@ public class CliDeployBinaryWork extends SynchronizableWork { /*** * deploy binaries to each node controller */ - for (NodeControllerState ncs : nodeControllerStateMap.values()) { + for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) { ncs.getNodeController().deployBinary(deploymentId, binaryURLs); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java index 5f97ce2..de28c32 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliUnDeployBinaryWork.java @@ -19,14 +19,14 @@ package org.apache.hyracks.control.cc.work; -import java.util.Map; +import java.util.Collection; import java.util.Set; -import java.util.TreeSet; import java.util.UUID; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.deployment.DeploymentRun; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.work.IPCResponder; @@ -68,12 +68,8 @@ public class CliUnDeployBinaryWork extends SynchronizableWork { /** * Deploy for the node controllers */ - Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap(); - - Set<String> nodeIds = new TreeSet<String>(); - for (String nc : nodeControllerStateMap.keySet()) { - nodeIds.add(nc); - } + INodeManager nodeManager = ccs.getNodeManager(); + Collection<String> nodeIds = nodeManager.getAllNodeIds(); final DeploymentRun dRun = new DeploymentRun(nodeIds); /** The following call prevents a user to undeploy with the same deployment id simultaneously. */ @@ -82,7 +78,7 @@ public class CliUnDeployBinaryWork extends SynchronizableWork { /*** * deploy binaries to each node controller */ - for (NodeControllerState ncs : nodeControllerStateMap.values()) { + for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) { ncs.getNodeController().undeployBinary(deploymentId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java index e05dfbc..0b89f55 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java @@ -19,14 +19,13 @@ package org.apache.hyracks.control.cc.work; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; +import java.util.Collection; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.shutdown.ShutdownRun; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; @@ -52,9 +51,8 @@ public class ClusterShutdownWork extends SynchronizableWork { if (ccs.getShutdownRun() != null) { throw new IPCException("Shutdown already in progress"); } - Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap(); - Set<String> nodeIds = new TreeSet<>(); - nodeIds.addAll(nodeControllerStateMap.keySet()); + INodeManager nodeManager = ccs.getNodeManager(); + Collection<String> nodeIds = nodeManager.getAllNodeIds(); /** * set up our listener for the node ACKs */ @@ -64,7 +62,7 @@ public class ClusterShutdownWork extends SynchronizableWork { /** * Shutdown all the nodes... */ - nodeControllerStateMap.forEach(this::shutdownNode); + nodeManager.apply(this::shutdownNode); ccs.getExecutor().execute(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java index 91a5906..7709a2b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java @@ -16,16 +16,17 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.hyracks.control.cc.work; +import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.work.SynchronizableWork; public class GatherStateDumpsWork extends SynchronizableWork { @@ -41,8 +42,9 @@ public class GatherStateDumpsWork extends SynchronizableWork { @Override public void doRun() throws Exception { ccs.addStateDumpRun(sdr.stateDumpId, sdr); - sdr.setNCs(new HashSet<>(ccs.getNodeMap().keySet())); - for (NodeControllerState ncs : ccs.getNodeMap().values()) { + INodeManager nodeManager = ccs.getNodeManager(); + sdr.setNCs(nodeManager.getAllNodeIds()); + for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) { ncs.getNodeController().dumpState(sdr.stateDumpId); } } @@ -59,7 +61,7 @@ public class GatherStateDumpsWork extends SynchronizableWork { private final Map<String, String> ncStates; - private Set<String> ncIds; + private Collection<String> ncIds; private boolean complete; @@ -70,7 +72,7 @@ public class GatherStateDumpsWork extends SynchronizableWork { complete = false; } - public void setNCs(Set<String> ncIds) { + public void setNCs(Collection<String> ncIds) { this.ncIds = ncIds; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java index 294ae97..6a4a1d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java @@ -18,14 +18,15 @@ */ package org.apache.hyracks.control.cc.work; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.SynchronizableWork; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class GetActivityClusterGraphJSONWork extends SynchronizableWork { private final ClusterControllerService ccs; private final JobId jobId; @@ -38,15 +39,12 @@ public class GetActivityClusterGraphJSONWork extends SynchronizableWork { @Override protected void doRun() throws Exception { - + IJobManager jobManager = ccs.getJobManager(); ObjectMapper om = new ObjectMapper(); - JobRun run = ccs.getActiveRunMap().get(jobId); + JobRun run = jobManager.get(jobId); if (run == null) { - run = ccs.getRunMapArchive().get(jobId); - if (run == null) { - json = om.createObjectNode(); - return; - } + json = om.createObjectNode(); + return; } json = run.getActivityClusterGraph().toJSON(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java index 31a829c..872fb9c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java @@ -23,19 +23,20 @@ import java.util.Map; import java.util.Set; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.work.SynchronizableWork; public class GetIpAddressNodeNameMapWork extends SynchronizableWork { - private final ClusterControllerService ccs; + private final INodeManager nodeManager; private Map<InetAddress, Set<String>> map; - public GetIpAddressNodeNameMapWork(ClusterControllerService ccs, Map<InetAddress, Set<String>> map) { - this.ccs = ccs; + public GetIpAddressNodeNameMapWork(INodeManager nodeManager, Map<InetAddress, Set<String>> map) { + this.nodeManager = nodeManager; this.map = map; } @Override protected void doRun() throws Exception { - map.putAll(ccs.getIpAddressNodeNameMap()); + map.putAll(nodeManager.getIpAddressNodeNameMap()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java index e072c21..8fe6470 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobInfoWork.java @@ -20,18 +20,18 @@ package org.apache.hyracks.control.cc.work; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobInfo; -import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; public class GetJobInfoWork extends SynchronizableWork { - private final ClusterControllerService ccs; + private final IJobManager jobManager; private final JobId jobId; private final IResultCallback<JobInfo> callback; - public GetJobInfoWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobInfo> callback) { - this.ccs = ccs; + public GetJobInfoWork(IJobManager jobManager, JobId jobId, IResultCallback<JobInfo> callback) { + this.jobManager = jobManager; this.jobId = jobId; this.callback = callback; } @@ -39,10 +39,7 @@ public class GetJobInfoWork extends SynchronizableWork { @Override protected void doRun() throws Exception { try { - JobRun run = ccs.getActiveRunMap().get(jobId); - if (run == null) { - run = ccs.getRunMapArchive().get(jobId); - } + JobRun run = jobManager.get(jobId); JobInfo info = (run != null) ? new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations()) : null; callback.setValue(info); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java index aad6edf..3a7c449 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobRunJSONWork.java @@ -18,34 +18,31 @@ */ package org.apache.hyracks.control.cc.work; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.SynchronizableWork; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class GetJobRunJSONWork extends SynchronizableWork { - private final ClusterControllerService ccs; + private final IJobManager jobManager; private final JobId jobId; private ObjectNode json; - public GetJobRunJSONWork(ClusterControllerService ccs, JobId jobId) { - this.ccs = ccs; + public GetJobRunJSONWork(IJobManager jobManager, JobId jobId) { + this.jobManager = jobManager; this.jobId = jobId; } @Override protected void doRun() throws Exception { ObjectMapper om = new ObjectMapper(); - JobRun run = ccs.getActiveRunMap().get(jobId); + JobRun run = jobManager.get(jobId); if (run == null) { - run = ccs.getRunMapArchive().get(jobId); - if (run == null) { - json = om.createObjectNode(); - return; - } + json = om.createObjectNode(); + return; } json = run.toJSON(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java index d45a9cc..b5bf8b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobStatusWork.java @@ -16,22 +16,23 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.hyracks.control.cc.work; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobStatus; -import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; public class GetJobStatusWork extends SynchronizableWork { - private final ClusterControllerService ccs; + private final IJobManager jobManager; private final JobId jobId; private final IResultCallback<JobStatus> callback; - public GetJobStatusWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobStatus> callback) { - this.ccs = ccs; + public GetJobStatusWork(IJobManager jobManager, JobId jobId, IResultCallback<JobStatus> callback) { + this.jobManager = jobManager; this.jobId = jobId; this.callback = callback; } @@ -39,10 +40,7 @@ public class GetJobStatusWork extends SynchronizableWork { @Override protected void doRun() throws Exception { try { - JobRun run = ccs.getActiveRunMap().get(jobId); - if (run == null) { - run = ccs.getRunMapArchive().get(jobId); - } + JobRun run = jobManager.get(jobId); JobStatus status = run == null ? null : run.getStatus(); callback.setValue(status); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java index 1e5a3a5..9c680c3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetJobSummariesJSONWork.java @@ -16,31 +16,34 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.hyracks.control.cc.work; import java.util.Collection; +import org.apache.hyracks.control.cc.job.IJobManager; +import org.apache.hyracks.control.cc.job.JobRun; +import org.apache.hyracks.control.common.work.SynchronizableWork; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.job.JobRun; -import org.apache.hyracks.control.common.work.SynchronizableWork; public class GetJobSummariesJSONWork extends SynchronizableWork { - private final ClusterControllerService ccs; + private final IJobManager jobManager; private ArrayNode summaries; - public GetJobSummariesJSONWork(ClusterControllerService ccs) { - this.ccs = ccs; + public GetJobSummariesJSONWork(IJobManager jobManager) { + this.jobManager = jobManager; } @Override protected void doRun() throws Exception { ObjectMapper om = new ObjectMapper(); summaries = om.createArrayNode(); - populateJSON(ccs.getActiveRunMap().values()); - populateJSON(ccs.getRunMapArchive().values()); + populateJSON(jobManager.getRunningJobs()); + populateJSON(jobManager.getPendingJobs()); + populateJSON(jobManager.getArchivedJobs()); } private void populateJSON(Collection<JobRun> jobRuns) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e0c232d2/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java index a0150f2..c36b887 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java @@ -18,35 +18,25 @@ */ package org.apache.hyracks.control.cc.work; -import java.util.LinkedHashMap; import java.util.Map; import org.apache.hyracks.api.client.NodeControllerInfo; -import org.apache.hyracks.api.client.NodeStatus; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.cc.NodeControllerState; +import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.common.work.IResultCallback; public class GetNodeControllersInfoWork extends AbstractWork { - private final ClusterControllerService ccs; + private final INodeManager nodeManager; private IResultCallback<Map<String, NodeControllerInfo>> callback; - public GetNodeControllersInfoWork(ClusterControllerService ccs, + public GetNodeControllersInfoWork(INodeManager nodeManager, IResultCallback<Map<String, NodeControllerInfo>> callback) { - this.ccs = ccs; + this.nodeManager = nodeManager; this.callback = callback; } @Override public void run() { - Map<String, NodeControllerInfo> result = new LinkedHashMap<>(); - Map<String, NodeControllerState> nodeMap = ccs.getNodeMap(); - for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) { - NodeControllerState ncState = e.getValue(); - result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, ncState.getDataPort(), - ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getNumCores())); - } - callback.setValue(result); + callback.setValue(nodeManager.getNodeControllerInfoMap()); } }