http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java deleted file mode 100644 index 687b491..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/Cluster.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.state; - -import com.google.gson.Gson; - -import java.util.Collection; -import java.util.HashSet; -import java.util.UUID; - -/** - * Model which represents the configuration of a cluster - */ -public class Cluster { - private String clusterId; - private String clusterName; - private Collection<NodeTask> nodes; - private String resourceManagerHost; - private String resourceManagerPort; - private double minQuota; - - public Cluster() { - this.clusterId = UUID.randomUUID().toString(); - this.nodes = new HashSet<>(); - } - - public String getClusterId() { - return clusterId; - } - - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - public Collection<NodeTask> getNodes() { - return nodes; - } - - public void addNode(NodeTask node) { - this.nodes.add(node); - } - - public void addNodes(Collection<NodeTask> nodes) { - this.nodes.addAll(nodes); - } - - public void removeNode(NodeTask task) { - this.nodes.remove(task); - } - - public String getResourceManagerHost() { - return resourceManagerHost; - } - - public void setResourceManagerHost(String resourceManagerHost) { - this.resourceManagerHost = resourceManagerHost; - } - - public String getResourceManagerPort() { - return resourceManagerPort; - } - - public void setResourceManagerPort(String resourceManagerPort) { - this.resourceManagerPort = resourceManagerPort; - } - - public double getMinQuota() { - return minQuota; - } - - public void setMinQuota(double minQuota) { - this.minQuota = minQuota; - } - - public String toString() { - Gson gson = new Gson(); - return gson.toJson(this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java deleted file mode 100644 index 5e868f9..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadState.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.state; - -import com.google.protobuf.InvalidProtocolBufferException; - -import org.apache.mesos.Protos; -import org.apache.mesos.state.State; -import org.apache.mesos.state.Variable; - -import java.util.concurrent.ExecutionException; - -/** - * Model that represents the state of Myriad - */ -public class MyriadState { - public static final String KEY_FRAMEWORK_ID = "frameworkId"; - - private State stateStore; - - public MyriadState(State stateStore) { - this.stateStore = stateStore; - } - - public Protos.FrameworkID getFrameworkID() throws InterruptedException, ExecutionException, InvalidProtocolBufferException { - byte[] frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get().value(); - - if (frameworkId.length > 0) { - return Protos.FrameworkID.parseFrom(frameworkId); - } else { - return null; - } - } - - public void setFrameworkId(Protos.FrameworkID newFrameworkId) throws InterruptedException, ExecutionException { - Variable frameworkId = stateStore.fetch(KEY_FRAMEWORK_ID).get(); - frameworkId = frameworkId.mutate(newFrameworkId.toByteArray()); - stateStore.store(frameworkId).get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java deleted file mode 100644 index 99ab327..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.ebay.myriad.state; - -import com.ebay.myriad.state.utils.StoreContext; - -/** - * Interface implemented by all Myriad State Store implementations - */ -public interface MyriadStateStore { - - StoreContext loadMyriadState() throws Exception; - - void storeMyriadState(StoreContext storeContext) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java deleted file mode 100644 index d784092..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.state; - -import com.ebay.myriad.scheduler.constraints.Constraint; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.List; -import com.ebay.myriad.scheduler.ServiceResourceProfile; -import com.ebay.myriad.scheduler.TaskUtils; -import com.google.inject.Inject; - -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.Attribute; - -/** - * Represents a task to be launched by the executor - */ -public class NodeTask { - @JsonProperty - private String hostname; - @JsonProperty - private Protos.SlaveID slaveId; - @JsonProperty - private Protos.TaskStatus taskStatus; - @JsonProperty - private String taskPrefix; - @JsonProperty - private ServiceResourceProfile serviceresourceProfile; - - @Inject - TaskUtils taskUtils; - /** - * Mesos executor for this node. - */ - private Protos.ExecutorInfo executorInfo; - - private Constraint constraint; - private List<Attribute> slaveAttributes; - - public NodeTask(ServiceResourceProfile profile, Constraint constraint) { - this.serviceresourceProfile = profile; - this.hostname = ""; - this.constraint = constraint; - } - - public Protos.SlaveID getSlaveId() { - return slaveId; - } - - public void setSlaveId(Protos.SlaveID slaveId) { - this.slaveId = slaveId; - } - - public Constraint getConstraint() { - return constraint; - } - - public String getHostname() { - return this.hostname; - } - - public void setHostname(String hostname) { - this.hostname = hostname; - } - - public Protos.TaskStatus getTaskStatus() { - return taskStatus; - } - - public void setTaskStatus(Protos.TaskStatus taskStatus) { - this.taskStatus = taskStatus; - } - - public Protos.ExecutorInfo getExecutorInfo() { - return executorInfo; - } - - public void setExecutorInfo(Protos.ExecutorInfo executorInfo) { - this.executorInfo = executorInfo; - } - - public void setSlaveAttributes(List<Attribute> slaveAttributes) { - this.slaveAttributes = slaveAttributes; - } - - public List<Attribute> getSlaveAttributes() { - return slaveAttributes; - } - - public String getTaskPrefix() { - return taskPrefix; - } - - public void setTaskPrefix(String taskPrefix) { - this.taskPrefix = taskPrefix; - } - - public ServiceResourceProfile getProfile() { - return serviceresourceProfile; - } - - public void setProfile(ServiceResourceProfile serviceresourceProfile) { - this.serviceresourceProfile = serviceresourceProfile; - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java deleted file mode 100644 index 99a7506..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ /dev/null @@ -1,549 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.state; - -import com.ebay.myriad.scheduler.ServiceResourceProfile; -import com.ebay.myriad.state.utils.StoreContext; -import com.google.common.collect.Sets; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Pattern; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.SlaveID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Represents the state of the Myriad scheduler - */ -public class SchedulerState { - private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class); - - private static Pattern taskIdPattern = Pattern.compile("\\."); - - private Map<Protos.TaskID, NodeTask> tasks; - private Protos.FrameworkID frameworkId; - private MyriadStateStore stateStore; - private Map<String, SchedulerStateForType> statesForTaskType; - - public SchedulerState(MyriadStateStore stateStore) { - this.tasks = new ConcurrentHashMap<>(); - this.stateStore = stateStore; - this.statesForTaskType = new ConcurrentHashMap<>(); - loadStateStore(); - } - - /** - * Making method synchronized, so if someone tries flexup/down at the same time - * addNodes and removeTask will not put data into an inconsistent state - * - * @param nodes - */ - public synchronized void addNodes(Collection<NodeTask> nodes) { - if (CollectionUtils.isEmpty(nodes)) { - LOGGER.info("No nodes to add"); - return; - } - for (NodeTask node : nodes) { - Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(String.format("%s.%s.%s", node.getTaskPrefix(), node.getProfile().getName(), UUID.randomUUID())).build(); - addTask(taskId, node); - SchedulerStateForType taskState = this.statesForTaskType.get(node.getTaskPrefix()); - LOGGER.info("Marked taskId {} pending, size of pending queue for {} is: {}", taskId.getValue(), node.getTaskPrefix(), (taskState == null ? 0 : taskState.getPendingTaskIds().size())); - makeTaskPending(taskId); - } - - } - - // TODO (sdaingade) Clone NodeTask - public synchronized void addTask(Protos.TaskID taskId, NodeTask node) { - this.tasks.put(taskId, node); - updateStateStore(); - } - - public synchronized void updateTask(Protos.TaskStatus taskStatus) { - Objects.requireNonNull(taskStatus, "TaskStatus object shouldn't be null"); - Protos.TaskID taskId = taskStatus.getTaskId(); - if (this.tasks.containsKey(taskId)) { - this.tasks.get(taskId).setTaskStatus(taskStatus); - } - updateStateStore(); - } - - public synchronized void makeTaskPending(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskPending(taskId); - updateStateStore(); - } - - public synchronized void makeTaskStaging(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskStaging(taskId); - updateStateStore(); - } - - public synchronized void makeTaskActive(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskActive(taskId); - updateStateStore(); - } - - public synchronized void makeTaskLost(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskLost(taskId); - updateStateStore(); - } - - public synchronized void makeTaskKillable(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - taskTypeState.makeTaskKillable(taskId); - updateStateStore(); - } - - // TODO (sdaingade) Clone NodeTask - public synchronized NodeTask getTask(Protos.TaskID taskId) { - return this.tasks.get(taskId); - } - - public synchronized Set<Protos.TaskID> getKillableTasks() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getKillableTasks()); - } - return returnSet; - } - - public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks()); - } - - public synchronized void removeTask(Protos.TaskID taskId) { - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState != null) { - taskTypeState.removeTask(taskId); - } - this.tasks.remove(taskId); - updateStateStore(); - } - - public synchronized Set<Protos.TaskID> getPendingTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getPendingTaskIds()); - } - return returnSet; - } - - public synchronized Collection<Protos.TaskID> getPendingTaskIDsForProfile(ServiceResourceProfile serviceProfile) { - List<Protos.TaskID> pendingTaskIds = new ArrayList<>(); - Set<Protos.TaskID> pendingTasks = getPendingTaskIds(); - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - NodeTask nodeTask = entry.getValue(); - if (pendingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { - pendingTaskIds.add(entry.getKey()); - } - } - return Collections.unmodifiableCollection(pendingTaskIds); - } - - public synchronized Set<Protos.TaskID> getPendingTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getPendingTaskIds()); - } - - public synchronized Set<Protos.TaskID> getActiveTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getActiveTaskIds()); - } - return returnSet; - } - - public synchronized Set<Protos.TaskID> getActiveTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getActiveTaskIds()); - } - - public synchronized Set<NodeTask> getActiveTasks() { - return getTasks(getActiveTaskIds()); - } - - public Set<NodeTask> getActiveTasksByType(String taskPrefix) { - return getTasks(getActiveTaskIds(taskPrefix)); - } - - public Set<NodeTask> getStagingTasks() { - return getTasks(getStagingTaskIds()); - } - - public Set<NodeTask> getStagingTasksByType(String taskPrefix) { - return getTasks(getStagingTaskIds(taskPrefix)); - } - - public Set<NodeTask> getPendingTasksByType(String taskPrefix) { - return getTasks(getPendingTaskIds(taskPrefix)); - } - - public synchronized Set<NodeTask> getTasks(Set<Protos.TaskID> taskIds) { - Set<NodeTask> nodeTasks = new HashSet<>(); - if (CollectionUtils.isNotEmpty(taskIds) && CollectionUtils.isNotEmpty(tasks.values())) { - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - if (taskIds.contains(entry.getKey())) { - nodeTasks.add(entry.getValue()); - } - } - } - return Collections.unmodifiableSet(nodeTasks); - } - - public synchronized Collection<Protos.TaskID> getActiveTaskIDsForProfile(ServiceResourceProfile serviceProfile) { - List<Protos.TaskID> activeTaskIDs = new ArrayList<>(); - Set<Protos.TaskID> activeTaskIds = getActiveTaskIds(); - if (CollectionUtils.isNotEmpty(activeTaskIds) && CollectionUtils.isNotEmpty(tasks.values())) { - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - NodeTask nodeTask = entry.getValue(); - if (activeTaskIds.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { - activeTaskIDs.add(entry.getKey()); - } - } - } - return Collections.unmodifiableCollection(activeTaskIDs); - } - - // TODO (sdaingade) Clone NodeTask - public synchronized NodeTask getNodeTask(SlaveID slaveId, String taskPrefix) { - if (taskPrefix == null) { - return null; - } - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - final NodeTask task = entry.getValue(); - if (task.getSlaveId() != null && - task.getSlaveId().equals(slaveId) && - taskPrefix.equals(task.getTaskPrefix())) { - return entry.getValue(); - } - } - return null; - } - - public synchronized Set<NodeTask> getNodeTasks(SlaveID slaveId) { - Set<NodeTask> nodeTasks = Sets.newHashSet(); - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - final NodeTask task = entry.getValue(); - if (task.getSlaveId() != null && task.getSlaveId().equals(slaveId)) { - nodeTasks.add(entry.getValue()); - } - } - return nodeTasks; - } - - public Set<Protos.TaskID> getStagingTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getStagingTaskIds()); - } - return returnSet; - } - - public synchronized Collection<Protos.TaskID> getStagingTaskIDsForProfile(ServiceResourceProfile serviceProfile) { - List<Protos.TaskID> stagingTaskIDs = new ArrayList<>(); - - Set<Protos.TaskID> stagingTasks = getStagingTaskIds(); - for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) { - NodeTask nodeTask = entry.getValue(); - if (stagingTasks.contains(entry.getKey()) && nodeTask.getProfile().getName().equals(serviceProfile.getName())) { - stagingTaskIDs.add(entry.getKey()); - } - } - return Collections.unmodifiableCollection(stagingTaskIDs); - } - - public Set<Protos.TaskID> getStagingTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getStagingTaskIds()); - } - - public Set<Protos.TaskID> getLostTaskIds() { - Set<Protos.TaskID> returnSet = new HashSet<>(); - for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getLostTaskIds()); - } - return returnSet; - } - - public Set<Protos.TaskID> getLostTaskIds(String taskPrefix) { - SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getLostTaskIds()); - } - - // TODO (sdaingade) Currently cannot return unmodifiableCollection - // as this will break ReconcileService code - public synchronized Collection<Protos.TaskStatus> getTaskStatuses() { - Collection<Protos.TaskStatus> taskStatuses = new ArrayList<>(this.tasks.size()); - Collection<NodeTask> tasks = this.tasks.values(); - for (NodeTask task : tasks) { - Protos.TaskStatus taskStatus = task.getTaskStatus(); - if (taskStatus != null) { - taskStatuses.add(taskStatus); - } - } - - return taskStatuses; - } - - public synchronized boolean hasTask(Protos.TaskID taskID) { - return this.tasks.containsKey(taskID); - } - - public synchronized Protos.FrameworkID getFrameworkID() { - return this.frameworkId; - } - - public synchronized void setFrameworkId(Protos.FrameworkID newFrameworkId) { - this.frameworkId = newFrameworkId; - updateStateStore(); - } - - private synchronized void updateStateStore() { - if (this.stateStore == null) { - LOGGER.debug("Could not update state to state store as HA is disabled"); - return; - } - - try { - StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(), getLostTaskIds(), getKillableTasks()); - stateStore.storeMyriadState(sc); - } catch (Exception e) { - LOGGER.error("Failed to update scheduler state to state store", e); - } - } - - private synchronized void loadStateStore() { - if (this.stateStore == null) { - LOGGER.debug("Could not load state from state store as HA is disabled"); - return; - } - - try { - StoreContext sc = stateStore.loadMyriadState(); - if (sc != null) { - this.frameworkId = sc.getFrameworkId(); - this.tasks.putAll(sc.getTasks()); - convertToThis(TaskState.PENDING, sc.getPendingTasks()); - convertToThis(TaskState.STAGING, sc.getStagingTasks()); - convertToThis(TaskState.ACTIVE, sc.getActiveTasks()); - convertToThis(TaskState.LOST, sc.getLostTasks()); - convertToThis(TaskState.KILLABLE, sc.getKillableTasks()); - LOGGER.info("Loaded Myriad state from state store successfully."); - LOGGER.debug("State Store state includes " + - "frameworkId: {}, pending tasks count: {}, staging tasks count: {} " + - "active tasks count: {}, lost tasks count: {}, " + - "and killable tasks count: {}", frameworkId.getValue(), this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), this.getActiveTaskIds().size(), this.getLostTaskIds().size(), this.getKillableTasks().size()); - } - } catch (Exception e) { - LOGGER.error("Failed to read scheduler state from state store", e); - } - } - - private void convertToThis(TaskState taskType, Set<Protos.TaskID> taskIds) { - for (Protos.TaskID taskId : taskIds) { - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - SchedulerStateForType taskTypeState = statesForTaskType.get(taskPrefix); - if (taskTypeState == null) { - taskTypeState = new SchedulerStateForType(taskPrefix); - statesForTaskType.put(taskPrefix, taskTypeState); - } - switch (taskType) { - case PENDING: - taskTypeState.makeTaskPending(taskId); - break; - case STAGING: - taskTypeState.makeTaskStaging(taskId); - break; - case ACTIVE: - taskTypeState.makeTaskActive(taskId); - break; - case KILLABLE: - taskTypeState.makeTaskKillable(taskId); - break; - case LOST: - taskTypeState.makeTaskLost(taskId); - break; - } - } - } - - /** - * Class to keep all the tasks states for a particular taskPrefix together - */ - private static class SchedulerStateForType { - - private final String taskPrefix; - private Set<Protos.TaskID> pendingTasks; - private Set<Protos.TaskID> stagingTasks; - private Set<Protos.TaskID> activeTasks; - private Set<Protos.TaskID> lostTasks; - private Set<Protos.TaskID> killableTasks; - - public SchedulerStateForType(String taskPrefix) { - this.taskPrefix = taskPrefix; - // Since Sets.newConcurrentHashSet is available only starting form Guava version 15 - // and so far (Hadoop 2.7) uses guava 13 we can not easily use it - this.pendingTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); - this.stagingTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); - this.activeTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); - this.lostTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); - this.killableTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); - - } - - @SuppressWarnings("unused") - public String getTaskPrefix() { - return taskPrefix; - } - - public synchronized void makeTaskPending(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - - pendingTasks.add(taskId); - stagingTasks.remove(taskId); - activeTasks.remove(taskId); - lostTasks.remove(taskId); - killableTasks.remove(taskId); - } - - public synchronized void makeTaskStaging(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - pendingTasks.remove(taskId); - stagingTasks.add(taskId); - activeTasks.remove(taskId); - lostTasks.remove(taskId); - killableTasks.remove(taskId); - } - - public synchronized void makeTaskActive(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - pendingTasks.remove(taskId); - stagingTasks.remove(taskId); - activeTasks.add(taskId); - lostTasks.remove(taskId); - killableTasks.remove(taskId); - } - - public synchronized void makeTaskLost(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - pendingTasks.remove(taskId); - stagingTasks.remove(taskId); - activeTasks.remove(taskId); - lostTasks.add(taskId); - killableTasks.remove(taskId); - } - - public synchronized void makeTaskKillable(Protos.TaskID taskId) { - Objects.requireNonNull(taskId, "taskId cannot be empty or null"); - pendingTasks.remove(taskId); - stagingTasks.remove(taskId); - activeTasks.remove(taskId); - lostTasks.remove(taskId); - killableTasks.add(taskId); - } - - public synchronized void removeTask(Protos.TaskID taskId) { - this.pendingTasks.remove(taskId); - this.stagingTasks.remove(taskId); - this.activeTasks.remove(taskId); - this.lostTasks.remove(taskId); - this.killableTasks.remove(taskId); - } - - public synchronized Set<Protos.TaskID> getPendingTaskIds() { - return Collections.unmodifiableSet(this.pendingTasks); - } - - public Set<Protos.TaskID> getActiveTaskIds() { - return Collections.unmodifiableSet(this.activeTasks); - } - - public synchronized Set<Protos.TaskID> getStagingTaskIds() { - return Collections.unmodifiableSet(this.stagingTasks); - } - - public synchronized Set<Protos.TaskID> getLostTaskIds() { - return Collections.unmodifiableSet(this.lostTasks); - } - - public synchronized Set<Protos.TaskID> getKillableTasks() { - return Collections.unmodifiableSet(this.killableTasks); - } - - } - - /** - * TaskState type - */ - public enum TaskState { - PENDING, - STAGING, - ACTIVE, - KILLABLE, - LOST - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java deleted file mode 100644 index fcf2cb8..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java +++ /dev/null @@ -1,368 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.ebay.myriad.state.utils; - -import com.ebay.myriad.scheduler.constraints.Constraint; -import com.ebay.myriad.scheduler.constraints.Constraint.Type; -import com.ebay.myriad.scheduler.constraints.LikeConstraint; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.mesos.Protos; - -import com.ebay.myriad.scheduler.ServiceResourceProfile; -import com.ebay.myriad.state.NodeTask; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.protobuf.GeneratedMessage; - -/** - * ByteBuffer support for the Serialization of the StoreContext - */ -public class ByteBufferSupport { - - public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; - public static final String UTF8 = "UTF-8"; - public static final byte[] ZERO_BYTES = new byte[0]; - private static Gson gson = new Gson(); - private static Gson gsonCustom = new GsonBuilder().registerTypeAdapter(ServiceResourceProfile.class, new ServiceResourceProfile.CustomDeserializer()).create(); - - public static void addByteBuffers(List<ByteBuffer> list, ByteArrayOutputStream bytes) throws IOException { - // If list, add the list size, then the size of each buffer followed by the buffer. - if (list != null) { - bytes.write(toIntBytes(list.size())); - for (ByteBuffer bb : list) { - addByteBuffer(bb, bytes); - } - } else { - bytes.write(toIntBytes(0)); - } - } - - public static void addByteBuffer(ByteBuffer bb, ByteArrayOutputStream bytes) throws IOException { - if (bb != null && bytes != null) { - bytes.write(toIntBytes(bb.array().length)); - bytes.write(bb.array()); - } - } - - public static ByteBuffer toByteBuffer(Protos.TaskID taskId) { - return toBuffer(taskId); - } - - public static ByteBuffer toByteBuffer(Protos.FrameworkID frameworkId) { - return toBuffer(frameworkId); - } - - /* - * Common method to convert Protobuf object to ByteBuffer - */ - public static ByteBuffer toBuffer(GeneratedMessage message) { - byte dst[]; - int size; - if (message != null) { - size = message.getSerializedSize() + INT_SIZE; - dst = message.toByteArray(); - } else { - size = INT_SIZE; - dst = ZERO_BYTES; - } - ByteBuffer bb = createBuffer(size); - putBytes(bb, dst); - bb.rewind(); - return bb; - } - - public static byte[] toIntBytes(int src) { - ByteBuffer bb = createBuffer(INT_SIZE); - bb.putInt(src); - return bb.array(); - } - - - public static ByteBuffer toByteBuffer(NodeTask nt) { - // Determine the size of ByteBuffer to allocate - // The ServiceResourceProfile toString() returns Json, if this ever changes then this - // will fail. Json is expected. - byte[] profile = toBytes(nt.getProfile().toString()); - int size = profile.length + INT_SIZE; - - Constraint constraint = nt.getConstraint(); - Constraint.Type type = constraint == null ? Type.NULL : constraint.getType(); - size += INT_SIZE; - - byte[] constraintBytes = ZERO_BYTES; - if (constraint != null) { - constraintBytes = toBytes(constraint.toString()); - size += constraintBytes.length + INT_SIZE; - } else { - size += INT_SIZE; - } - - byte[] hostname = toBytes(nt.getHostname()); - size += hostname.length + INT_SIZE; - - if (nt.getSlaveId() != null) { - size += nt.getSlaveId().getSerializedSize() + INT_SIZE; - } else { - size += INT_SIZE; - } - - if (nt.getTaskStatus() != null) { - size += nt.getTaskStatus().getSerializedSize() + INT_SIZE; - } else { - size += INT_SIZE; - } - - if (nt.getExecutorInfo() != null) { - size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE; - } else { - size += INT_SIZE; - } - - byte[] taskPrefixBytes = ZERO_BYTES; - if (nt.getTaskPrefix() != null) { - taskPrefixBytes = toBytes(nt.getTaskPrefix()); - size += taskPrefixBytes.length + INT_SIZE; - } - - // Allocate and populate the buffer. - ByteBuffer bb = createBuffer(size); - putBytes(bb, profile); - bb.putInt(type.ordinal()); - putBytes(bb, constraintBytes); - putBytes(bb, hostname); - putBytes(bb, getSlaveBytes(nt)); - putBytes(bb, getTaskBytes(nt)); - putBytes(bb, getExecutorInfoBytes(nt)); - putBytes(bb, taskPrefixBytes); - // Make sure the buffer is at the beginning - bb.rewind(); - return bb; - } - - /** - * Assumes the entire ByteBuffer is a TaskID. - * - * @param bb - * @return Protos.TaskID - */ - public static Protos.TaskID toTaskId(ByteBuffer bb) { - try { - return Protos.TaskID.parseFrom(getBytes(bb, bb.getInt())); - } catch (Exception e) { - throw new RuntimeException("Failed to parse Task ID", e); - } - } - - /** - * Assumes the entire ByteBuffer is a FrameworkID. - * - * @param bb - * @return Protos.FrameworkID - */ - public static Protos.FrameworkID toFrameworkID(ByteBuffer bb) { - try { - return Protos.FrameworkID.parseFrom(getBytes(bb, bb.getInt())); - } catch (Exception e) { - throw new RuntimeException("Failed to parse Framework ID", e); - } - } - - /** - * ByteBuffer is expected to have a NodeTask at its next position. - * - * @param bb - * @return NodeTask or null if buffer is empty. Can throw a RuntimeException - * if the buffer is not formatted correctly. - */ - public static NodeTask toNodeTask(ByteBuffer bb) { - NodeTask nt = null; - if (bb != null && bb.array().length > 0) { - nt = new NodeTask(getServiceResourceProfile(bb), getConstraint(bb)); - nt.setHostname(toString(bb)); - nt.setSlaveId(toSlaveId(bb)); - nt.setTaskStatus(toTaskStatus(bb)); - nt.setExecutorInfo(toExecutorInfo(bb)); - } - return nt; - } - - public static byte[] getTaskBytes(NodeTask nt) { - if (nt.getTaskStatus() != null) { - return nt.getTaskStatus().toByteArray(); - } else { - return ZERO_BYTES; - } - } - - public static byte[] getExecutorInfoBytes(NodeTask nt) { - if (nt.getExecutorInfo() != null) { - return nt.getExecutorInfo().toByteArray(); - } else { - return ZERO_BYTES; - } - } - - public static byte[] getSlaveBytes(NodeTask nt) { - if (nt.getSlaveId() != null) { - return nt.getSlaveId().toByteArray(); - } else { - return ZERO_BYTES; - } - } - - public static void putBytes(ByteBuffer bb, byte bytes[]) { - if (bytes != null && bytes.length > 0) { - bb.putInt(bytes.length); - bb.put(bytes); - } else { - bb.putInt(0); - } - } - - public static byte[] getBytes(ByteBuffer bb, int size) { - byte bytes[] = new byte[size]; - bb.get(bytes); - return bytes; - } - - /** - * This assumes the next position is the size as an int, and the following is a string - * iff the size is not zero. - * - * @param bb ByteBuffer to extract string from - * @return string from the next position, or "" if the size is zero - */ - public static String toString(ByteBuffer bb) { - byte[] bytes = new byte[bb.getInt()]; - String s = ""; - try { - if (bytes.length > 0) { - bb.get(bytes); - s = new String(bytes, UTF8); - } - } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse string bytes", e); - } - return s; - } - - public static byte[] toBytes(String s) { - try { - return s.getBytes(UTF8); - } catch (Exception e) { - return ZERO_BYTES; - } - } - - public static ServiceResourceProfile getServiceResourceProfile(ByteBuffer bb) { - String p = toString(bb); - if (!StringUtils.isEmpty(p)) { - return gsonCustom.fromJson(p, ServiceResourceProfile.class); - } else { - return null; - } - } - - public static Constraint getConstraint(ByteBuffer bb) { - Constraint.Type type = Constraint.Type.values()[bb.getInt()]; - String p = toString(bb); - switch (type) { - case NULL: - return null; - - case LIKE: - - if (!StringUtils.isEmpty(p)) { - return gson.fromJson(p, LikeConstraint.class); - } - } - return null; - } - - public static Protos.SlaveID toSlaveId(ByteBuffer bb) { - int size = bb.getInt(); - if (size > 0) { - try { - return Protos.SlaveID.parseFrom(getBytes(bb, size)); - } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse SlaveId bytes", e); - } - } else { - return null; - } - } - - public static Protos.TaskStatus toTaskStatus(ByteBuffer bb) { - int size = bb.getInt(); - if (size > 0) { - try { - return Protos.TaskStatus.parseFrom(getBytes(bb, size)); - } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse TaskStatus bytes", e); - } - } else { - return null; - } - } - - public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) { - int size = bb.getInt(); - if (size > 0) { - try { - return Protos.ExecutorInfo.parseFrom(getBytes(bb, size)); - } catch (Exception e) { - throw new RuntimeException("ByteBuffer not in expected format," + " failed to parse ExecutorInfo bytes", e); - } - } else { - return null; - } - } - - public static ByteBuffer fillBuffer(byte src[]) { - ByteBuffer bb = createBuffer(src.length); - bb.put(src); - bb.rewind(); - return bb; - } - - public static List<ByteBuffer> createBufferList(ByteBuffer bb, int size) { - List<ByteBuffer> list = new ArrayList<ByteBuffer>(size); - for (int i = 0; i < size; i++) { - list.add(fillBuffer(getBytes(bb, bb.getInt()))); - } - return list; - } - - private static ByteBuffer createBuffer(int size) { - return ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN); - } - - public static ByteBuffer createBuffer(ByteBuffer bb) { - return fillBuffer(getBytes(bb, bb.getInt())); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java deleted file mode 100644 index 2ffcaa7..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.ebay.myriad.state.utils; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.regex.Pattern; - -import org.apache.mesos.Protos; -import org.apache.mesos.Protos.TaskID; - -import com.ebay.myriad.state.NodeTask; - -/** - * The purpose of this container/utility is to create a mechanism to serialize the SchedulerState - * to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an - * alternative approach. - */ -public final class StoreContext { - private static Pattern taskIdPattern = Pattern.compile("\\."); - private ByteBuffer frameworkId; - private List<ByteBuffer> taskIds; - private List<ByteBuffer> taskNodes; - private List<ByteBuffer> pendingTasks; - private List<ByteBuffer> stagingTasks; - private List<ByteBuffer> activeTasks; - private List<ByteBuffer> lostTasks; - private List<ByteBuffer> killableTasks; - - public StoreContext() { - } - - /** - * Accept all the SchedulerState maps and flatten them into lists of ByteBuffers - * - * @param tasks - * @param pendingTasks - * @param stagingTasks - * @param activeTasks - * @param lostTasks - * @param killableTasks - */ - public StoreContext(Protos.FrameworkID frameworkId, Map<Protos.TaskID, NodeTask> tasks, Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks, Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks, Set<Protos.TaskID> - killableTasks) { - setFrameworkId(frameworkId); - setTasks(tasks); - setPendingTasks(pendingTasks); - setStagingTasks(stagingTasks); - setActiveTasks(activeTasks); - setLostTasks(lostTasks); - setKillableTasks(killableTasks); - } - - /** - * Accept list of ByteBuffers and re-create the SchedulerState maps. - * - * @param framwrorkId - * @param taskIds - * @param taskNodes - * @param pendingTasks - * @param stagingTasks - * @param activeTasks - * @param lostTasks - * @param killableTasks - */ - public StoreContext(ByteBuffer frameworkId, List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes, List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks, List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, List<ByteBuffer> - killableTasks) { - this.frameworkId = frameworkId; - this.taskIds = taskIds; - this.taskNodes = taskNodes; - this.pendingTasks = pendingTasks; - this.stagingTasks = stagingTasks; - this.activeTasks = activeTasks; - this.lostTasks = lostTasks; - this.killableTasks = killableTasks; - } - - /** - * Use this to gather bytes to push to the state store - * - * @return byte stream of the state store context. - * @throws IOException - */ - public ByteArrayOutputStream toSerializedContext() throws IOException { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - ByteBufferSupport.addByteBuffer(frameworkId, bytes); - ByteBufferSupport.addByteBuffers(taskIds, bytes); - ByteBufferSupport.addByteBuffers(taskNodes, bytes); - ByteBufferSupport.addByteBuffers(pendingTasks, bytes); - ByteBufferSupport.addByteBuffers(stagingTasks, bytes); - ByteBufferSupport.addByteBuffers(activeTasks, bytes); - ByteBufferSupport.addByteBuffers(lostTasks, bytes); - ByteBufferSupport.addByteBuffers(killableTasks, bytes); - return bytes; - } - - /** - * When the bytes come back from the store, use this method to create a new context. - * - * @param bytes from state store - * @return initialized StoreContext to use to initialize a SchedulerState - */ - @SuppressWarnings("unchecked") - public static StoreContext fromSerializedBytes(byte bytes[]) { - StoreContext ctx; - if (bytes != null && bytes.length > 0) { - ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes); - ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb); - List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, bb.getInt()); - List<ByteBuffer> taskNodes = ByteBufferSupport.createBufferList(bb, bb.getInt()); - List<ByteBuffer> pendingTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); - List<ByteBuffer> stagingTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); - List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); - List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); - List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); - ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks, lostTasks, killableTasks); - } else { - ctx = new StoreContext(); - } - return ctx; - } - - /** - * Serialize tasks into internal ByteBuffers, removing the map. - * - * @param tasks - */ - public void setTasks(Map<Protos.TaskID, NodeTask> tasks) { - taskIds = new ArrayList<ByteBuffer>(tasks.size()); - taskNodes = new ArrayList<ByteBuffer>(tasks.size()); - for (Entry<TaskID, NodeTask> entry : tasks.entrySet()) { - taskIds.add(ByteBufferSupport.toByteBuffer(entry.getKey())); - taskNodes.add(ByteBufferSupport.toByteBuffer(entry.getValue())); - } - } - - /** - * De-serialize the internal ByteBuffer back into a Protos.FrameworkID. - * - * @return - */ - public Protos.FrameworkID getFrameworkId() { - return ByteBufferSupport.toFrameworkID(frameworkId); - } - - /** - * Serialize the Protos.FrameworkID into a ByteBuffer. - */ - public void setFrameworkId(Protos.FrameworkID frameworkId) { - if (frameworkId != null) { - this.frameworkId = ByteBufferSupport.toByteBuffer(frameworkId); - } - } - - /** - * De-serialize the internal ByteBuffers back into a Task map. - * - * @return - */ - public Map<Protos.TaskID, NodeTask> getTasks() { - Map<Protos.TaskID, NodeTask> map = null; - if (taskIds != null) { - map = new HashMap<Protos.TaskID, NodeTask>(taskIds.size()); - int idx = 0; - for (ByteBuffer bb : taskIds) { - final Protos.TaskID taskId = ByteBufferSupport.toTaskId(bb); - final NodeTask task = ByteBufferSupport.toNodeTask(taskNodes.get(idx++)); - if (task.getTaskPrefix() == null && taskId != null) { - String taskPrefix = taskIdPattern.split(taskId.getValue())[0]; - task.setTaskPrefix(taskPrefix); - } - map.put(taskId, task); - } - } else { - map = new HashMap<Protos.TaskID, NodeTask>(0); - } - return map; - } - - public void setPendingTasks(Set<Protos.TaskID> tasks) { - if (tasks != null) { - pendingTasks = new ArrayList<ByteBuffer>(tasks.size()); - toTaskBuffer(tasks, pendingTasks); - } - } - - public Set<Protos.TaskID> getPendingTasks() { - return toTaskSet(pendingTasks); - } - - public void setStagingTasks(Set<Protos.TaskID> tasks) { - if (tasks != null) { - stagingTasks = new ArrayList<ByteBuffer>(tasks.size()); - toTaskBuffer(tasks, stagingTasks); - } - } - - public Set<Protos.TaskID> getStagingTasks() { - return toTaskSet(stagingTasks); - } - - public void setActiveTasks(Set<Protos.TaskID> tasks) { - if (tasks != null) { - activeTasks = new ArrayList<ByteBuffer>(tasks.size()); - toTaskBuffer(tasks, activeTasks); - } - } - - public Set<Protos.TaskID> getActiveTasks() { - return toTaskSet(activeTasks); - } - - public void setLostTasks(Set<Protos.TaskID> tasks) { - if (tasks != null) { - lostTasks = new ArrayList<ByteBuffer>(tasks.size()); - toTaskBuffer(tasks, lostTasks); - } - } - - public Set<Protos.TaskID> getLostTasks() { - return toTaskSet(lostTasks); - } - - public void setKillableTasks(Set<Protos.TaskID> tasks) { - if (tasks != null) { - killableTasks = new ArrayList<ByteBuffer>(tasks.size()); - toTaskBuffer(tasks, killableTasks); - } - } - - public Set<Protos.TaskID> getKillableTasks() { - return toTaskSet(killableTasks); - } - - private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) { - for (Protos.TaskID id : src) { - tgt.add(ByteBufferSupport.toByteBuffer(id)); - } - } - - private Set<Protos.TaskID> toTaskSet(List<ByteBuffer> src) { - Set<Protos.TaskID> tasks; - if (src != null) { - tasks = new HashSet<Protos.TaskID>(src.size()); - for (int i = 0; i < src.size(); i++) { - tasks.add(ByteBufferSupport.toTaskId(src.get(i))); - } - } else { - tasks = new HashSet<Protos.TaskID>(0); - } - return tasks; - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java deleted file mode 100644 index a52b310..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/HttpConnectorProvider.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.webapp; - -import com.ebay.myriad.configuration.MyriadConfiguration; -import com.google.inject.Provider; -import org.mortbay.jetty.Connector; -import org.mortbay.jetty.nio.SelectChannelConnector; - -import javax.inject.Inject; - -/** - * The factory for creating the http connector for the myriad scheduler - */ -public class HttpConnectorProvider implements Provider<Connector> { - - private MyriadConfiguration myriadConf; - - @Inject - public HttpConnectorProvider(MyriadConfiguration myriadConf) { - this.myriadConf = myriadConf; - } - - @Override - public Connector get() { - SelectChannelConnector ret = new SelectChannelConnector(); - ret.setName("Myriad"); - ret.setHost("0.0.0.0"); - ret.setPort(myriadConf.getRestApiPort()); - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java deleted file mode 100644 index cea22d1..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadServletModule.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.webapp; - -import com.ebay.myriad.api.ClustersResource; -import com.ebay.myriad.api.ConfigurationResource; -import com.ebay.myriad.api.SchedulerStateResource; -import com.google.inject.Scopes; -import com.google.inject.servlet.ServletModule; -import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; -import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider; - -/** - * The guice module for configuring the myriad dashboard - */ -public class MyriadServletModule extends ServletModule { - - @Override - protected void configureServlets() { - bind(ClustersResource.class); - bind(ConfigurationResource.class); - bind(SchedulerStateResource.class); - - bind(GuiceContainer.class); - bind(JacksonJaxbJsonProvider.class).in(Scopes.SINGLETON); - - serve("/api/*").with(GuiceContainer.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java deleted file mode 100644 index 2b54d4c..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/MyriadWebServer.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.webapp; - -import com.google.inject.servlet.GuiceFilter; -import org.mortbay.jetty.Connector; -import org.mortbay.jetty.Handler; -import org.mortbay.jetty.Server; -import org.mortbay.jetty.servlet.*; - -import javax.inject.Inject; - -/** - * The myriad web server configuration for jetty - */ -public class MyriadWebServer { - private final Server jetty; - private final Connector connector; - private final GuiceFilter filter; - - @Inject - public MyriadWebServer(Server jetty, Connector connector, GuiceFilter filter) { - this.jetty = jetty; - this.connector = connector; - this.filter = filter; - } - - public void start() throws Exception { - this.jetty.addConnector(connector); - - ServletHandler servletHandler = new ServletHandler(); - - String filterName = "MyriadGuiceFilter"; - FilterHolder holder = new FilterHolder(filter); - holder.setName(filterName); - - FilterMapping filterMapping = new FilterMapping(); - filterMapping.setPathSpec("/*"); - filterMapping.setDispatches(Handler.ALL); - filterMapping.setFilterName(filterName); - - servletHandler.addFilter(holder, filterMapping); - - Context context = new Context(); - context.setServletHandler(servletHandler); - context.addServlet(DefaultServlet.class, "/"); - - String staticDir = this.getClass().getClassLoader().getResource("webapp/public").toExternalForm(); - context.setResourceBase(staticDir); - - this.jetty.addHandler(context); - this.jetty.start(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java deleted file mode 100644 index 861dc00..0000000 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/webapp/WebAppGuiceModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.ebay.myriad.webapp; - -import com.google.inject.AbstractModule; -import org.mortbay.jetty.Connector; - -/** - * The guice web application configuration - */ -public class WebAppGuiceModule extends AbstractModule { - - @Override - protected void configure() { - bind(Connector.class).toProvider(HttpConnectorProvider.class); - install(new MyriadServletModule()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java index 227229c..4991df2 100644 --- a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java +++ b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java @@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.ebay.myriad.state.MyriadStateStore; -import com.ebay.myriad.state.utils.StoreContext; +import org.apache.myriad.state.MyriadStateStore; +import org.apache.myriad.state.utils.StoreContext; /** * StateStore that stores Myriad state in addition to RM state to DFS. http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java new file mode 100644 index 0000000..698f615 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad; + +import org.apache.myriad.scheduler.event.handlers.DisconnectedEventHandler; +import org.apache.myriad.scheduler.event.handlers.ErrorEventHandler; +import org.apache.myriad.scheduler.event.handlers.ExecutorLostEventHandler; +import org.apache.myriad.scheduler.event.handlers.FrameworkMessageEventHandler; +import org.apache.myriad.scheduler.event.handlers.OfferRescindedEventHandler; +import org.apache.myriad.scheduler.event.handlers.ReRegisteredEventHandler; +import org.apache.myriad.scheduler.event.handlers.RegisteredEventHandler; +import org.apache.myriad.scheduler.event.handlers.ResourceOffersEventHandler; +import org.apache.myriad.scheduler.event.handlers.SlaveLostEventHandler; +import org.apache.myriad.scheduler.event.handlers.StatusUpdateEventHandler; +import com.google.inject.Injector; +import com.lmax.disruptor.dsl.Disruptor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Disruptor class is an event bus used in high speed financial systems. http://martinfowler.com/articles/lmax.html + * Here it is used to abstract incoming events. + */ +public class DisruptorManager { + private ExecutorService disruptorExecutors; + + private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64; + private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024; + + private Disruptor<org.apache.myriad.scheduler.event.RegisteredEvent> registeredEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.ReRegisteredEvent> reRegisteredEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.ResourceOffersEvent> resourceOffersEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.OfferRescindedEvent> offerRescindedEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.StatusUpdateEvent> statusUpdateEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.FrameworkMessageEvent> frameworkMessageEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.DisconnectedEvent> disconnectedEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.SlaveLostEvent> slaveLostEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.ExecutorLostEvent> executorLostEventDisruptor; + private Disruptor<org.apache.myriad.scheduler.event.ErrorEvent> errorEventDisruptor; + + @SuppressWarnings("unchecked") + public void init(Injector injector) { + this.disruptorExecutors = Executors.newCachedThreadPool(); + + // todo: (kensipe) need to make ringsize configurable (overriding the defaults) + + + this.registeredEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); + this.registeredEventDisruptor.handleEventsWith(injector.getInstance(RegisteredEventHandler.class)); + this.registeredEventDisruptor.start(); + + this.reRegisteredEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); + this.reRegisteredEventDisruptor.handleEventsWith(injector.getInstance(ReRegisteredEventHandler.class)); + this.reRegisteredEventDisruptor.start(); + + + this.resourceOffersEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.resourceOffersEventDisruptor.handleEventsWith(injector.getInstance(ResourceOffersEventHandler.class)); + this.resourceOffersEventDisruptor.start(); + + this.offerRescindedEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.offerRescindedEventDisruptor.handleEventsWith(injector.getInstance(OfferRescindedEventHandler.class)); + this.offerRescindedEventDisruptor.start(); + + this.statusUpdateEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.statusUpdateEventDisruptor.handleEventsWith(injector.getInstance(StatusUpdateEventHandler.class)); + this.statusUpdateEventDisruptor.start(); + + this.frameworkMessageEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.frameworkMessageEventDisruptor.handleEventsWith(injector.getInstance(FrameworkMessageEventHandler.class)); + this.frameworkMessageEventDisruptor.start(); + + this.disconnectedEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.disconnectedEventDisruptor.handleEventsWith(injector.getInstance(DisconnectedEventHandler.class)); + this.disconnectedEventDisruptor.start(); + + this.slaveLostEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.slaveLostEventDisruptor.handleEventsWith(injector.getInstance(SlaveLostEventHandler.class)); + this.slaveLostEventDisruptor.start(); + + this.executorLostEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.executorLostEventDisruptor.handleEventsWith(injector.getInstance(ExecutorLostEventHandler.class)); + this.executorLostEventDisruptor.start(); + + this.errorEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ErrorEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.errorEventDisruptor.handleEventsWith(injector.getInstance(ErrorEventHandler.class)); + this.errorEventDisruptor.start(); + } + + public Disruptor<org.apache.myriad.scheduler.event.RegisteredEvent> getRegisteredEventDisruptor() { + return registeredEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.ReRegisteredEvent> getReRegisteredEventDisruptor() { + return reRegisteredEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.ResourceOffersEvent> getResourceOffersEventDisruptor() { + return resourceOffersEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.OfferRescindedEvent> getOfferRescindedEventDisruptor() { + return offerRescindedEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.StatusUpdateEvent> getStatusUpdateEventDisruptor() { + return statusUpdateEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.FrameworkMessageEvent> getFrameworkMessageEventDisruptor() { + return frameworkMessageEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.DisconnectedEvent> getDisconnectedEventDisruptor() { + return disconnectedEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.SlaveLostEvent> getSlaveLostEventDisruptor() { + return slaveLostEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.ExecutorLostEvent> getExecutorLostEventDisruptor() { + return executorLostEventDisruptor; + } + + public Disruptor<org.apache.myriad.scheduler.event.ErrorEvent> getErrorEventDisruptor() { + return errorEventDisruptor; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java new file mode 100644 index 0000000..0f305e8 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.health.HealthCheckRegistry; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.apache.myriad.configuration.MyriadBadConfigurationException; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.health.MesosDriverHealthCheck; +import org.apache.myriad.health.MesosMasterHealthCheck; +import org.apache.myriad.health.ZookeeperHealthCheck; +import org.apache.myriad.scheduler.NMProfile; +import org.apache.myriad.scheduler.ServiceProfileManager; +import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.TaskFactory; +import org.apache.myriad.webapp.WebAppGuiceModule; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.inject.Guice; +import com.google.inject.Injector; + +import org.apache.commons.collections.MapUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.HashSet; + +/** + * Main entry point for myriad scheduler + */ +public class Main { + private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); + + private org.apache.myriad.webapp.MyriadWebServer webServer; + private ScheduledExecutorService terminatorService; + + private ScheduledExecutorService rebalancerService; + private HealthCheckRegistry healthCheckRegistry; + + private static Injector injector; + + public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry) throws Exception { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + MyriadConfiguration cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"), MyriadConfiguration.class); + + MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry); + MesosModule mesosModule = new MesosModule(); + injector = Guice.createInjector(myriadModule, mesosModule, new WebAppGuiceModule()); + + new Main().run(cfg); + } + + // TODO (Kannan Rajah) Hack to get injector in unit test. + public static Injector getInjector() { + return injector; + } + + public void run(MyriadConfiguration cfg) throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Bindings: " + injector.getAllBindings()); + } + + JmxReporter.forRegistry(new MetricRegistry()).build().start(); + + initWebApp(injector); + initHealthChecks(injector); + initProfiles(injector); + validateNMInstances(injector); + initServiceConfigurations(cfg, injector); + initDisruptors(injector); + + initRebalancerService(cfg, injector); + initTerminatorService(injector); + startMesosDriver(injector); + startNMInstances(injector); + startJavaBasedTaskInstance(injector); + } + + + private void startMesosDriver(Injector injector) { + LOGGER.info("starting mesosDriver.."); + injector.getInstance(org.apache.myriad.scheduler.MyriadDriverManager.class).startDriver(); + LOGGER.info("started mesosDriver.."); + } + + /** + * Brings up the embedded jetty webserver for serving REST APIs. + * + * @param injector + */ + private void initWebApp(Injector injector) throws Exception { + webServer = injector.getInstance(org.apache.myriad.webapp.MyriadWebServer.class); + webServer.start(); + } + + /** + * Initializes health checks. + * + * @param injector + */ + private void initHealthChecks(Injector injector) { + LOGGER.info("Initializing HealthChecks"); + healthCheckRegistry = new HealthCheckRegistry(); + healthCheckRegistry.register(MesosMasterHealthCheck.NAME, injector.getInstance(MesosMasterHealthCheck.class)); + healthCheckRegistry.register(ZookeeperHealthCheck.NAME, injector.getInstance(ZookeeperHealthCheck.class)); + healthCheckRegistry.register(MesosDriverHealthCheck.NAME, injector.getInstance(MesosDriverHealthCheck.class)); + } + + private void initProfiles(Injector injector) { + LOGGER.info("Initializing Profiles"); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager = injector.getInstance(org.apache.myriad.scheduler.TaskConstraintsManager.class); + taskConstraintsManager.addTaskConstraints(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints()); + Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles(); + org.apache.myriad.scheduler.TaskUtils taskUtils = injector.getInstance(org.apache.myriad.scheduler.TaskUtils.class); + if (MapUtils.isNotEmpty(profiles)) { + for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) { + Map<String, String> profileResourceMap = profile.getValue(); + if (MapUtils.isNotEmpty(profiles) && profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) { + Long cpu = Long.parseLong(profileResourceMap.get("cpu")); + Long mem = Long.parseLong(profileResourceMap.get("mem")); + + ServiceResourceProfile serviceProfile = new org.apache.myriad.scheduler.ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory()); + serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus()); + serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory()); + + profileManager.add(serviceProfile); + } else { + LOGGER.error("Invalid definition for profile: " + profile.getKey()); + } + } + } + } + + private void validateNMInstances(Injector injector) { + LOGGER.info("Validating nmInstances.."); + Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + + long maxCpu = Long.MIN_VALUE; + long maxMem = Long.MIN_VALUE; + for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { + String profile = entry.getKey(); + ServiceResourceProfile nodeManager = profileManager.get(profile); + if (nodeManager == null) { + throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'"); + } + if (entry.getValue() > 0) { + if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus + maxCpu = nodeManager.getCpus().longValue(); + maxMem = nodeManager.getMemory().longValue(); // use the memory from the same profile + } + } + } + if (maxCpu <= 0 || maxMem <= 0) { + throw new RuntimeException("Please configure 'nmInstances' with at least one instance/profile " + "with non-zero cpu/mem resources."); + } + } + + private void startNMInstances(Injector injector) { + Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); + org.apache.myriad.scheduler.MyriadOperations myriadOperations = injector.getInstance(org.apache.myriad.scheduler.MyriadOperations.class); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + org.apache.myriad.state.SchedulerState schedulerState = injector.getInstance(org.apache.myriad.state.SchedulerState.class); + + Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>(); + launchedNMTasks.addAll(schedulerState.getPendingTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX)); + if (!launchedNMTasks.isEmpty()) { + LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size()); + return; + } + + launchedNMTasks.addAll(schedulerState.getStagingTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX)); + if (!launchedNMTasks.isEmpty()) { + LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size()); + return; + } + + launchedNMTasks.addAll(schedulerState.getActiveTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX)); + if (!launchedNMTasks.isEmpty()) { + LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size()); + return; + } + + for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { + LOGGER.info("Launching {} NM(s) with profile {}", entry.getValue(), entry.getKey()); + myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null); + } + } + + /** + * Create ServiceProfile for any configured service + * + * @param cfg + * @param injector + */ + private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) { + LOGGER.info("Initializing initServiceConfigurations"); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager = injector.getInstance(org.apache.myriad.scheduler.TaskConstraintsManager.class); + + Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); + if (servicesConfigs != null) { + for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) { + final String taskPrefix = entry.getKey(); + ServiceConfiguration config = entry.getValue(); + final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU); + final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); + + profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); + taskConstraintsManager.addTaskConstraints(taskPrefix, new org.apache.myriad.scheduler.ServiceTaskConstraints(cfg, taskPrefix)); + } + } + } + + private void initTerminatorService(Injector injector) { + LOGGER.info("Initializing Terminator"); + terminatorService = Executors.newScheduledThreadPool(1); + final int initialDelay = 100; + final int period = 2000; + terminatorService.scheduleAtFixedRate(injector.getInstance(org.apache.myriad.scheduler.TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS); + } + + private void initRebalancerService(MyriadConfiguration cfg, Injector injector) { + if (cfg.isRebalancer()) { + LOGGER.info("Initializing Rebalancer"); + rebalancerService = Executors.newScheduledThreadPool(1); + final int initialDelay = 100; + final int period = 5000; + rebalancerService.scheduleAtFixedRate(injector.getInstance(org.apache.myriad.scheduler.Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS); + } else { + LOGGER.info("Rebalancer is not turned on"); + } + } + + private void initDisruptors(Injector injector) { + LOGGER.info("Initializing Disruptors"); + DisruptorManager disruptorManager = injector.getInstance(DisruptorManager.class); + disruptorManager.init(injector); + } + + /** + * Start tasks for configured services + * + * @param injector + */ + private void startJavaBasedTaskInstance(Injector injector) { + Map<String, ServiceConfiguration> auxServicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); + if (auxServicesConfigs != null) { + org.apache.myriad.scheduler.MyriadOperations myriadOperations = injector.getInstance(org.apache.myriad.scheduler.MyriadOperations.class); + for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { + try { + myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey()); + } catch (MyriadBadConfigurationException e) { + LOGGER.warn("Exception while trying to flexup service: {}", entry.getKey(), e); + } + } + } + } +}