http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java new file mode 100644 index 0000000..23e9798 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.myriad.scheduler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.myriad.configuration.MyriadConfiguration; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +/** + * Implementation assumes NM binaries will be downloaded + */ +public class DownloadNMExecutorCLGenImpl extends NMExecutorCLGenImpl { + + private static final Logger LOGGER = LoggerFactory. + getLogger(DownloadNMExecutorCLGenImpl.class); + + private final String nodeManagerUri; + + public DownloadNMExecutorCLGenImpl(MyriadConfiguration cfg, String nodeManagerUri) { + super(cfg); + this.nodeManagerUri = nodeManagerUri; + } + + @Override + public String generateCommandLine(ServiceResourceProfile profile, Ports ports) { + StringBuilder cmdLine = new StringBuilder(); + LOGGER.info("Using remote distribution"); + + generateEnvironment(profile, (NMPorts) ports); + appendDistroExtractionCommands(cmdLine); + appendCgroupsCmds(cmdLine); + appendYarnHomeExport(cmdLine); + appendUser(cmdLine); + appendEnvForNM(cmdLine); + cmdLine.append(YARN_NM_CMD); + return cmdLine.toString(); + } + + protected void appendDistroExtractionCommands(StringBuilder cmdLine) { + /* + TODO(darinj): Overall this is messier than I'd like. We can't let mesos untar the distribution, since + it will change the permissions. Instead we simply download the tarball and execute tar -xvpf. We also + pull the config from the resource manager and put them in the conf dir. This is also why we need + frameworkSuperUser. This will be refactored after Mesos-1790 is resolved. + */ + + //TODO(DarinJ) support other compression, as this is a temp fix for Mesos 1760 may not get to it. + //Extract tarball keeping permissions, necessary to keep HADOOP_HOME/bin/container-executor suidbit set. + cmdLine.append("sudo tar -zxpf ").append(getFileName(nodeManagerUri)); + + //We need the current directory to be writable by frameworkUser for capsuleExecutor to create directories. + //Best to simply give owenership to the user running the executor but we don't want to use -R as this + //will silently remove the suid bit on container executor. + cmdLine.append(" && sudo chown ").append(cfg.getFrameworkUser().get()).append(" ."); + + //Place the hadoop config where in the HADOOP_CONF_DIR where it will be read by the NodeManager + //The url for the resource manager config is: http(s)://hostname:port/conf so fetcher.cpp downloads the + //config file to conf, It's an xml file with the parameters of yarn-site.xml, core-site.xml and hdfs.xml. + cmdLine.append(" && cp conf ").append(cfg.getYarnEnvironment().get("YARN_HOME")).append("/etc/hadoop/yarn-site.xml;"); + } + + protected void appendUser(StringBuilder cmdLine) { + cmdLine.append(" sudo -E -u ").append(cfg.getFrameworkUser().get()).append(" -H"); + } + + private static String getFileName(String uri) { + int lastSlash = uri.lastIndexOf('/'); + if (lastSlash == -1) { + return uri; + } else { + String fileName = uri.substring(lastSlash + 1); + Preconditions.checkArgument(!Strings.isNullOrEmpty(fileName), "URI should not have a slash at the end"); + return fileName; + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java new file mode 100644 index 0000000..82782f2 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExecutorCommandLineGenerator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.myriad.scheduler; + +/** + * Interface to plugin multiple implementations for executor command generation + */ +public interface ExecutorCommandLineGenerator { + String generateCommandLine(ServiceResourceProfile profile, Ports ports); + + String getConfigurationUrl(); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java new file mode 100644 index 0000000..8119360 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ExtendedResourceProfile.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import com.google.gson.Gson; + +/** + * Extended ServiceResourceProfile for services that need to pass set of resources downstream + * currently the only such service is NodeManager + */ +public class ExtendedResourceProfile extends ServiceResourceProfile { + + private NMProfile childProfile; + + /** + * @param childProfile - should be null + * @param cpu + * @param mem will throw NullPoiterException if childProfile is null + */ + public ExtendedResourceProfile(NMProfile childProfile, Double cpu, Double mem) { + super(childProfile.getName(), cpu, mem); + this.childProfile = childProfile; + this.className = ExtendedResourceProfile.class.getName(); + } + + public NMProfile getChildProfile() { + return childProfile; + } + + public void setChildProfile(NMProfile nmProfile) { + this.childProfile = nmProfile; + } + + @Override + public String getName() { + return childProfile.getName(); + } + + @Override + public Double getCpus() { + return childProfile.getCpus().doubleValue(); + } + + @Override + public Double getMemory() { + return childProfile.getMemory().doubleValue(); + } + + @Override + public Double getAggregateMemory() { + return memory + childProfile.getMemory(); + } + + @Override + public Double getAggregateCpu() { + return cpus + childProfile.getCpus(); + } + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java new file mode 100644 index 0000000..741c6de --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import javax.inject.Inject; + +import org.apache.mesos.Protos.Status; +import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Driver for Myriad scheduler. + */ +public class MyriadDriver { + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriver.class); + + private final SchedulerDriver driver; + + @Inject + public MyriadDriver(SchedulerDriver driver) { + this.driver = driver; + } + + public Status start() { + LOGGER.info("Starting driver"); + Status status = driver.start(); + LOGGER.info("Driver started with status: {}", status); + return status; + } + + public Status kill(final TaskID taskId) { + LOGGER.info("Killing task {}", taskId); + Status status = driver.killTask(taskId); + LOGGER.info("Task {} killed with status: {}", taskId, status); + return status; + } + + public Status abort() { + LOGGER.info("Aborting driver"); + Status status = driver.abort(); + LOGGER.info("Driver aborted with status: {}", status); + return status; + } + + public SchedulerDriver getDriver() { + return driver; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java new file mode 100644 index 0000000..44d7e06 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import com.google.common.base.Preconditions; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Status; +import org.apache.mesos.Protos.TaskID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Manager for the myriad scheduler driver + */ +public class MyriadDriverManager { + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriverManager.class); + private final Lock driverLock; + private MyriadDriver driver; + private Status driverStatus; + + @Inject + public MyriadDriverManager(MyriadDriver driver) { + this.driver = driver; + this.driverLock = new ReentrantLock(); + this.driverStatus = Protos.Status.DRIVER_NOT_STARTED; + } + + public Status startDriver() { + this.driverLock.lock(); + try { + Preconditions.checkState(this.isStartable()); + LOGGER.info("Starting driver..."); + this.driverStatus = driver.start(); + LOGGER.info("Driver started with status: {}", this.driverStatus); + } finally { + this.driverLock.unlock(); + } + return this.driverStatus; + } + + public Status stopDriver() { + this.driverLock.lock(); + try { + if (isRunning()) { + LOGGER.info("Aborting driver..."); + this.driverStatus = this.driver.abort(); + LOGGER.info("Aborted driver with status: {}", this.driverStatus); + } + } finally { + this.driverLock.unlock(); + } + return driverStatus; + } + + public Status kill(final TaskID taskId) { + LOGGER.info("Killing task {}", taskId); + this.driverLock.lock(); + try { + if (isRunning()) { + this.driverStatus = driver.kill(taskId); + LOGGER.info("Task {} killed with status: {}", taskId, this.driverStatus); + } else { + LOGGER.warn("Cannot kill task, driver is not running"); + } + } finally { + this.driverLock.unlock(); + } + + return driverStatus; + } + + public Status getDriverStatus() { + return this.driverStatus; + } + + private boolean isStartable() { + return this.driver != null && this.driverStatus == Status.DRIVER_NOT_STARTED; + } + + private boolean isRunning() { + return this.driver != null && this.driverStatus == Status.DRIVER_RUNNING; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java new file mode 100644 index 0000000..6b0de6d --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import org.apache.myriad.configuration.MyriadBadConfigurationException; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.policy.NodeScaleDownPolicy; +import org.apache.myriad.scheduler.constraints.Constraint; +import org.apache.myriad.scheduler.constraints.LikeConstraint; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; +import com.google.common.collect.Lists; +import com.google.inject.Inject; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Myriad scheduler operations + */ +public class MyriadOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class); + private final SchedulerState schedulerState; + + private MyriadConfiguration cfg; + private NodeScaleDownPolicy nodeScaleDownPolicy; + + @Inject + public MyriadOperations(MyriadConfiguration cfg, SchedulerState schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy) { + this.cfg = cfg; + this.schedulerState = schedulerState; + this.nodeScaleDownPolicy = nodeScaleDownPolicy; + } + + public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) { + Collection<NodeTask> nodes = new HashSet<>(); + for (int i = 0; i < instances; i++) { + NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint); + nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX); + nodes.add(nodeTask); + } + + LOGGER.info("Adding {} NM instances to cluster", nodes.size()); + this.schedulerState.addNodes(nodes); + } + + public void flexDownCluster(ServiceResourceProfile serviceResourceProfile, Constraint constraint, int numInstancesToScaleDown) { + // Flex down Pending tasks, if any + int numPendingTasksScaledDown = flexDownPendingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown); + + // Flex down Staging tasks, if any + int numStagingTasksScaledDown = flexDownStagingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown); + + // Flex down Active tasks, if any + int numActiveTasksScaledDown = flexDownActiveTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown); + + if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) { + LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.", serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString()); + } else { + LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with " + "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceResourceProfile.getName(), + constraint == null ? "null" : constraint.toString()); + } + } + + /** + * Flexup a service + * + * @param instances + * @param serviceName + */ + public void flexUpAService(int instances, String serviceName) throws MyriadBadConfigurationException { + final ServiceConfiguration auxTaskConf = cfg.getServiceConfiguration(serviceName); + + int totalflexInstances = instances + getFlexibleInstances(serviceName); + Integer maxInstances = auxTaskConf.getMaxInstances().orNull(); + if (maxInstances != null && maxInstances > 0) { + // check number of instances + // sum of active, staging, pending should be < maxInstances + if (totalflexInstances > maxInstances) { + LOGGER.error("Current number of active, staging, pending and requested instances: {}" + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances); + throw new MyriadBadConfigurationException("Current number of active, staging, pending instances and requested: " + totalflexInstances + ", while it is greater then max instances allowed: " + maxInstances); + } + } + + final Double cpu = auxTaskConf.getCpus().or(ServiceConfiguration.DEFAULT_CPU); + final Double mem = auxTaskConf.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); + + Collection<NodeTask> nodes = new HashSet<>(); + for (int i = 0; i < instances; i++) { + NodeTask nodeTask = new NodeTask(new ServiceResourceProfile(serviceName, cpu, mem), null); + nodeTask.setTaskPrefix(serviceName); + nodes.add(nodeTask); + } + + LOGGER.info("Adding {} {} instances to cluster", nodes.size(), serviceName); + this.schedulerState.addNodes(nodes); + } + + /** + * Flexing down any service defined in the configuration + * + * @param numInstancesToScaleDown + * @param serviceName - name of the service + */ + public void flexDownAService(int numInstancesToScaleDown, String serviceName) { + LOGGER.info("About to flex down {} instances of {}", numInstancesToScaleDown, serviceName); + + int numScaledDown = 0; + + // Flex down Pending tasks, if any + if (numScaledDown < numInstancesToScaleDown) { + Collection<Protos.TaskID> pendingTasks = this.schedulerState.getPendingTaskIds(serviceName); + + for (Protos.TaskID taskId : pendingTasks) { + this.schedulerState.makeTaskKillable(taskId); + numScaledDown++; + if (numScaledDown >= numInstancesToScaleDown) { + break; + } + } + } + int numPendingTasksScaledDown = numScaledDown; + + // Flex down Staging tasks, if any + if (numScaledDown < numInstancesToScaleDown) { + Collection<Protos.TaskID> stagingTasks = this.schedulerState.getStagingTaskIds(serviceName); + + for (Protos.TaskID taskId : stagingTasks) { + this.schedulerState.makeTaskKillable(taskId); + numScaledDown++; + if (numScaledDown >= numInstancesToScaleDown) { + break; + } + } + } + int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown; + + Set<NodeTask> activeTasks = this.schedulerState.getActiveTasksByType(serviceName); + if (numScaledDown < numInstancesToScaleDown) { + for (NodeTask nodeTask : activeTasks) { + this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); + numScaledDown++; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Marked NodeTask {} on host {} for kill.", nodeTask.getTaskStatus().getTaskId(), nodeTask.getHostname()); + } + if (numScaledDown >= numInstancesToScaleDown) { + break; + } + } + } + + LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}", numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName); + } + + private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { + return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0; + } + + private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { + return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0; + } + + private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { + if (numInstancesToScaleDown > 0) { + List<Protos.TaskID> activeTasksForProfile = Lists.newArrayList(schedulerState.getActiveTaskIDsForProfile(profile)); + nodeScaleDownPolicy.apply(activeTasksForProfile); + return flexDownTasks(activeTasksForProfile, profile, constraint, numInstancesToScaleDown); + } + return 0; + } + + private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { + int numInstancesScaledDown = 0; + for (Protos.TaskID taskID : taskIDs) { + NodeTask nodeTask = schedulerState.getTask(taskID); + if (nodeTask.getProfile().getName().equals(profile.getName()) && meetsConstraint(nodeTask, constraint)) { + this.schedulerState.makeTaskKillable(taskID); + numInstancesScaledDown++; + if (numInstancesScaledDown == numInstancesToScaleDown) { + break; + } + } + } + return numInstancesScaledDown; + } + + private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) { + if (constraint != null) { + if (constraint.equals(nodeTask.getConstraint())) { + return true; + } + switch (constraint.getType()) { + case LIKE: + LikeConstraint likeConstraint = (LikeConstraint) constraint; + if (likeConstraint.isConstraintOnHostName()) { + return likeConstraint.matchesHostName(nodeTask.getHostname()); + } else { + return likeConstraint.matchesSlaveAttributes(nodeTask.getSlaveAttributes()); + } + + default: + return false; + } + } + return true; + } + + public Integer getFlexibleInstances(String taskPrefix) { + return this.schedulerState.getActiveTaskIds(taskPrefix).size() + this.schedulerState.getStagingTaskIds(taskPrefix).size() + this.schedulerState.getPendingTaskIds(taskPrefix).size(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java new file mode 100644 index 0000000..235d01b --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.scheduler.event.ErrorEvent; +import org.apache.myriad.scheduler.event.FrameworkMessageEvent; +import org.apache.myriad.scheduler.event.OfferRescindedEvent; +import org.apache.myriad.scheduler.event.ReRegisteredEvent; +import org.apache.myriad.scheduler.event.ResourceOffersEvent; +import org.apache.myriad.scheduler.event.SlaveLostEvent; +import org.apache.myriad.scheduler.event.StatusUpdateEvent; +import com.lmax.disruptor.EventTranslator; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import javax.inject.Inject; +import java.util.List; + +/** + * Myriad Scheduler + */ +public class MyriadScheduler implements Scheduler { + private org.apache.myriad.DisruptorManager disruptorManager; + + @Inject + public MyriadScheduler(final MyriadConfiguration cfg, final org.apache.myriad.DisruptorManager disruptorManager) { + this.disruptorManager = disruptorManager; + } + + @Override + public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) { + disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<org.apache.myriad.scheduler.event.RegisteredEvent>() { + @Override + public void translateTo(org.apache.myriad.scheduler.event.RegisteredEvent event, long sequence) { + event.setDriver(driver); + event.setFrameworkId(frameworkId); + event.setMasterInfo(masterInfo); + } + }); + } + + @Override + public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) { + disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() { + @Override + public void translateTo(ReRegisteredEvent event, long sequence) { + event.setDriver(driver); + event.setMasterInfo(masterInfo); + } + }); + } + + @Override + public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) { + disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() { + @Override + public void translateTo(ResourceOffersEvent event, long sequence) { + event.setDriver(driver); + event.setOffers(offers); + } + }); + } + + @Override + public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) { + disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() { + @Override + public void translateTo(OfferRescindedEvent event, long sequence) { + event.setDriver(driver); + event.setOfferId(offerId); + } + }); + } + + @Override + public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) { + disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() { + @Override + public void translateTo(StatusUpdateEvent event, long sequence) { + event.setDriver(driver); + event.setStatus(status); + } + }); + } + + @Override + public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) { + disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new EventTranslator<FrameworkMessageEvent>() { + @Override + public void translateTo(FrameworkMessageEvent event, long sequence) { + event.setDriver(driver); + event.setBytes(bytes); + event.setExecutorId(executorId); + event.setSlaveId(slaveId); + } + }); + } + + @Override + public void disconnected(final SchedulerDriver driver) { + disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<org.apache.myriad.scheduler.event.DisconnectedEvent>() { + @Override + public void translateTo(org.apache.myriad.scheduler.event.DisconnectedEvent event, long sequence) { + event.setDriver(driver); + } + }); + } + + @Override + public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) { + disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() { + @Override + public void translateTo(SlaveLostEvent event, long sequence) { + event.setDriver(driver); + event.setSlaveId(slaveId); + } + }); + } + + @Override + public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int exitStatus) { + disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<org.apache.myriad.scheduler.event.ExecutorLostEvent>() { + @Override + public void translateTo(org.apache.myriad.scheduler.event.ExecutorLostEvent event, long sequence) { + event.setDriver(driver); + event.setExecutorId(executorId); + event.setSlaveId(slaveId); + event.setExitStatus(exitStatus); + } + }); + } + + @Override + public void error(final SchedulerDriver driver, final String message) { + disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() { + @Override + public void translateTo(ErrorEvent event, long sequence) { + event.setDriver(driver); + event.setMessage(message); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java new file mode 100644 index 0000000..411518f --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.myriad.scheduler; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.myriad.configuration.MyriadConfiguration; + +/** + * Implementation assumes NM binaries already deployed + */ +public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(NMExecutorCLGenImpl.class); + + public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS"; + public static final String KEY_YARN_NM_CGROUPS_PATH = "yarn.nodemanager.cgroups.path"; + public static final String KEY_YARN_RM_HOSTNAME = "yarn.resourcemanager.hostname"; + + /** + * YARN container executor class. + */ + public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class"; + // TODO (mohit): Should it be configurable ? + public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; + public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; + + /** + * YARN class to help handle LCE resources + */ + public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class"; + + // TODO (mohit): Should it be configurable ? + public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; + public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; + public static final String VAL_YARN_NM_LCE_CGROUPS_HIERARCHY = "mesos/$TASK_DIR"; + public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount"; + public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path"; + public static final String VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "/sys/fs/cgroup"; + public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group"; + public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path"; + public static final String KEY_YARN_HOME = "yarn.home"; + public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; + public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; + public static final String YARN_NM_CMD = " $YARN_HOME/bin/yarn nodemanager"; + public static final String KEY_NM_ADDRESS = "myriad.yarn.nodemanager.address"; + public static final String KEY_NM_LOCALIZER_ADDRESS = "myriad.yarn.nodemanager.localizer.address"; + public static final String KEY_NM_WEBAPP_ADDRESS = "myriad.yarn.nodemanager.webapp.address"; + public static final String KEY_NM_SHUFFLE_PORT = "myriad.mapreduce.shuffle.port"; + + private static final String ALL_LOCAL_IPV4ADDR = "0.0.0.0:"; + private static final String PROPERTY_FORMAT = "-D%s=%s"; + + private Map<String, String> environment = new HashMap<>(); + protected MyriadConfiguration cfg; + + public NMExecutorCLGenImpl(MyriadConfiguration cfg) { + this.cfg = cfg; + } + + @Override + public String generateCommandLine(ServiceResourceProfile profile, Ports ports) { + StringBuilder cmdLine = new StringBuilder(); + + generateEnvironment(profile, (NMPorts) ports); + appendCgroupsCmds(cmdLine); + appendYarnHomeExport(cmdLine); + appendEnvForNM(cmdLine); + cmdLine.append(YARN_NM_CMD); + return cmdLine.toString(); + } + + protected void generateEnvironment(ServiceResourceProfile profile, NMPorts ports) { + //yarnEnvironemnt configuration from yaml file + Map<String, String> yarnEnvironmentMap = cfg.getYarnEnvironment(); + if (yarnEnvironmentMap != null) { + environment.putAll(yarnEnvironmentMap); + } + + String rmHostName = System.getProperty(KEY_YARN_RM_HOSTNAME); + if (rmHostName != null && !rmHostName.isEmpty()) { + addYarnNodemanagerOpt(KEY_YARN_RM_HOSTNAME, rmHostName); + } + + if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) { + addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS); + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_RH_CLASS, VAL_YARN_NM_LCE_RH_CLASS); + + // TODO: Configure hierarchy + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_HIERARCHY, VAL_YARN_NM_LCE_CGROUPS_HIERARCHY); + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT, "true"); + // TODO: Make it configurable + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH, VAL_YARN_NM_LCE_CGROUPS_MOUNT_PATH); + addYarnNodemanagerOpt(KEY_YARN_NM_LCE_GROUP, "root"); + if (environment.containsKey("YARN_HOME")) { + addYarnNodemanagerOpt(KEY_YARN_HOME, environment.get("YARN_HOME")); + } + } else { + // Otherwise configure to use Default + addYarnNodemanagerOpt(KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS, DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS); + } + addYarnNodemanagerOpt(KEY_NM_RESOURCE_CPU_VCORES, Integer.toString(profile.getCpus().intValue())); + addYarnNodemanagerOpt(KEY_NM_RESOURCE_MEM_MB, Integer.toString(profile.getMemory().intValue())); + addYarnNodemanagerOpt(KEY_NM_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getRpcPort()).toString()); + addYarnNodemanagerOpt(KEY_NM_LOCALIZER_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getLocalizerPort()).toString()); + addYarnNodemanagerOpt(KEY_NM_WEBAPP_ADDRESS, ALL_LOCAL_IPV4ADDR + Long.valueOf(ports.getWebAppHttpPort()).toString()); + addYarnNodemanagerOpt(KEY_NM_SHUFFLE_PORT, Long.valueOf(ports.getShufflePort()).toString()); + } + + protected void appendEnvForNM(StringBuilder cmdLine) { + cmdLine.append(" env "); + for (Map.Entry<String, String> env : environment.entrySet()) { + cmdLine.append(env.getKey()).append("=").append("\"").append(env.getValue()).append("\" "); + } + } + + protected void appendCgroupsCmds(StringBuilder cmdLine) { + if (cfg.getNodeManagerConfiguration().getCgroups().or(Boolean.FALSE)) { + cmdLine.append(" export TASK_DIR=`basename $PWD`;"); + cmdLine.append(" chmod +x /sys/fs/cgroup/cpu/mesos/$TASK_DIR;"); + } + } + + protected void appendYarnHomeExport(StringBuilder cmdLine) { + if (environment.containsKey("YARN_HOME")) { + cmdLine.append(" export YARN_HOME=" + environment.get("YARN_HOME") + ";"); + } + } + + protected void addYarnNodemanagerOpt(String propertyName, String propertyValue) { + String envOpt = String.format(PROPERTY_FORMAT, propertyName, propertyValue); + if (environment.containsKey(ENV_YARN_NODEMANAGER_OPTS)) { + String existingOpts = environment.get(ENV_YARN_NODEMANAGER_OPTS); + environment.put(ENV_YARN_NODEMANAGER_OPTS, existingOpts + " " + envOpt); + } else { + environment.put(ENV_YARN_NODEMANAGER_OPTS, envOpt); + } + } + + @Override + public String getConfigurationUrl() { + YarnConfiguration conf = new YarnConfiguration(); + String httpPolicy = conf.get(TaskFactory.YARN_HTTP_POLICY); + if (httpPolicy != null && httpPolicy.equals(TaskFactory.YARN_HTTP_POLICY_HTTPS_ONLY)) { + String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_HTTPS_ADDRESS); + if (address == null || address.isEmpty()) { + address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8090"; + } + return "https://" + address + "/conf"; + } else { + String address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_WEBAPP_ADDRESS); + if (address == null || address.isEmpty()) { + address = conf.get(TaskFactory.YARN_RESOURCEMANAGER_HOSTNAME) + ":8088"; + } + return "http://" + address + "/conf"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java new file mode 100644 index 0000000..0944a2a --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import com.google.common.base.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * Helper class for dynamically assigning ports to nodemanager + */ +public class NMPorts implements Ports { + private static final String NM_RPC_PORT_KEY = "nm.rpc.port"; + private static final String NM_LOCALIZER_PORT_KEY = "nm.localizer.port"; + private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port"; + private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port"; + + private static final String[] NM_PORT_KEYS = {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY}; + + private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length); + + public NMPorts(Long[] ports) { + Preconditions.checkState(ports.length == NM_PORT_KEYS.length, "NMPorts: array \"ports\" is of unexpected length"); + for (int i = 0; i < NM_PORT_KEYS.length; i++) { + portsMap.put(NM_PORT_KEYS[i], ports[i]); + } + } + + public long getRpcPort() { + return portsMap.get(NM_RPC_PORT_KEY); + } + + public long getLocalizerPort() { + return portsMap.get(NM_LOCALIZER_PORT_KEY); + } + + public long getWebAppHttpPort() { + return portsMap.get(NM_WEBAPP_HTTP_PORT_KEY); + } + + public long getShufflePort() { + return portsMap.get(NM_HTTP_SHUFFLE_PORT_KEY); + } + + public static int expectedNumPorts() { + return NM_PORT_KEYS.length; + } + + /** + * @return a string representation of NMPorts + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder().append("{"); + for (String key : NM_PORT_KEYS) { + sb.append(key).append(": ").append(portsMap.get(key).toString()).append(", "); + } + sb.replace(sb.length() - 2, sb.length(), "}"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java new file mode 100644 index 0000000..3de82a5 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfile.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import com.google.gson.Gson; + +/** + * Node Manager Profile + */ +public class NMProfile { + private String name; + + /** + * Number of CPU advertised to YARN Resource Manager. + */ + private Long cpus; + + /** + * Memory in MB advertised to YARN Resource Manager. + */ + private Long memory; + + public NMProfile(String name, Long cpus, Long memory) { + this.name = name; + this.cpus = cpus; + this.memory = memory; + } + + public String getName() { + return name; + } + + public Long getCpus() { + return cpus; + } + + public Long getMemory() { + return memory; + } + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java new file mode 100644 index 0000000..38fad1d --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Node Manager Profile Manager + */ +public class NMProfileManager { + private static final Logger LOGGER = LoggerFactory.getLogger(NMProfileManager.class); + + private Map<String, NMProfile> profiles = new ConcurrentHashMap<>(); + + public NMProfile get(String name) { + return profiles.get(name); + } + + public void add(NMProfile profile) { + LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", profile.getName(), profile.getCpus(), profile.getMemory()); + + profiles.put(profile.getName(), profile); + } + + public boolean exists(String name) { + return this.profiles.containsKey(name); + } + + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java new file mode 100644 index 0000000..62d9f52 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.myriad.scheduler; + +import com.google.inject.BindingAnnotation; +import java.lang.annotation.Target; +import java.lang.annotation.Retention; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; + +/** + * NMTaskFactory annotation that allows to bind TaskFactory to NM specific implementation + */ +@BindingAnnotation +@Target({FIELD, PARAMETER, METHOD}) +@Retention(RUNTIME) +public @interface NMTaskFactoryAnnotation { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java new file mode 100644 index 0000000..03150fb --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Ports.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +/** + * Generic interface to represent ports + */ +public interface Ports { + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java new file mode 100644 index 0000000..955bc77 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import javax.inject.Inject; +import java.util.Set; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Rebalancer} is responsible for scaling registered YARN clusters as per + * configured rules and policies. + */ +public class Rebalancer implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(Rebalancer.class); + + private final org.apache.myriad.state.SchedulerState schedulerState; + private final MyriadOperations myriadOperations; + private final ServiceProfileManager profileManager; + + @Inject + public Rebalancer(org.apache.myriad.state.SchedulerState schedulerState, MyriadOperations myriadOperations, ServiceProfileManager profileManager) { + this.schedulerState = schedulerState; + this.myriadOperations = myriadOperations; + this.profileManager = profileManager; + } + + @Override + public void run() { + final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX); + final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX); + LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size()); + if (activeIds.size() < 1 && pendingIds.size() < 1) { + myriadOperations.flexUpCluster(profileManager.get("small"), 1, null); + } + // RestAdapter restAdapter = new RestAdapter.Builder() + // .setEndpoint("http://" + host + ":" + port) + // .setLogLevel(LogLevel.FULL).build(); + // YARNResourceManagerService service = restAdapter + // .create(YARNResourceManagerService.class); + // + // ClusterMetrics metrics = service.metrics().getClusterMetrics(); + // AppsResponse appsResponse = service.apps("ACCEPTED"); + // + // int acceptedApps = 0; + // + // if (appsResponse == null || appsResponse.getApps() == null + // || appsResponse.getApps().getApps() == null) { + // acceptedApps = 0; + // } else { + // acceptedApps = appsResponse.getApps().getApps().size(); + // } + // LOGGER.info("Metrics: {}", metrics); + // LOGGER.info("Apps: {}", appsResponse); + // + // long availableMB = metrics.getAvailableMB(); + // long allocatedMB = metrics.getAllocatedMB(); + // long reservedMB = metrics.getReservedMB(); + // int activeNodes = metrics.getActiveNodes(); + // int unhealthyNodes = metrics.getUnhealthyNodes(); + // int appsPending = metrics.getAppsPending(); + // int appsRunning = metrics.getAppsRunning(); + + // if (activeNodes == 0 && appsPending > 0) { + // LOGGER.info( + // "Flexing up for condition: activeNodes ({}) == 0 && appsPending ({}) > 0", + // activeNodes, appsPending); + // this.myriadOperations.flexUpCluster(clusterId, 1, "small"); + // } else if (appsPending == 0 && appsRunning == 0 && activeNodes > 0) { + // LOGGER.info( + // "Flexing down for condition: appsPending ({}) == 0 && appsRunning ({}) == 0 && activeNodes ({}) > 0", + // appsPending, appsRunning, activeNodes); + // this.myriadOperations.flexDownCluster(cluster, 1); + // } else if (acceptedApps > 0) { + // LOGGER.info("Flexing up for condition: acceptedApps ({}) > 0", + // acceptedApps); + // this.myriadOperations.flexUpCluster(clusterId, 1, "small"); + // } else { + // LOGGER.info("Nothing to rebalance"); + // this.schedulerState.releaseLock(clusterId); + // } + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java new file mode 100644 index 0000000..d0c74de --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.mesos.Protos; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; + +/** + * {@link ReconcileService} is responsible for reconciling tasks with the mesos master + */ +public class ReconcileService { + private static final Logger LOGGER = LoggerFactory.getLogger(ReconcileService.class); + + public static final long DEFAULT_RECONCILATION_DELAY_MS = 10000; + public static final long MAX_RECONCILE_ATTEMPTS = 10; + + private org.apache.myriad.state.SchedulerState state; + private MyriadConfiguration cfg; + private Date lastReconcileTime; + + @Inject + public ReconcileService(org.apache.myriad.state.SchedulerState state, MyriadConfiguration cfg) { + this.state = state; + this.cfg = cfg; + } + + public void reconcile(SchedulerDriver driver) { + Collection<Protos.TaskStatus> taskStatuses = state.getTaskStatuses(); + + if (taskStatuses.size() == 0) { + return; + } + LOGGER.info("Reconciling {} tasks.", taskStatuses.size()); + + driver.reconcileTasks(taskStatuses); + + lastReconcileTime = new Date(); + + int attempt = 1; + + while (attempt <= MAX_RECONCILE_ATTEMPTS) { + try { + // TODO(mohit): Using exponential backoff here, maybe backoff strategy should be configurable. + Thread.sleep(DEFAULT_RECONCILATION_DELAY_MS * attempt); + } catch (InterruptedException e) { + LOGGER.error("Interrupted", e); + } + Collection<Protos.TaskStatus> notYetReconciled = new ArrayList<>(); + for (Protos.TaskStatus status : state.getTaskStatuses()) { + if (status.getTimestamp() < lastReconcileTime.getTime()) { + notYetReconciled.add(status); + } + } + LOGGER.info("Reconcile attempt {} for {} tasks", attempt, notYetReconciled.size()); + driver.reconcileTasks(notYetReconciled); + lastReconcileTime = new Date(); + attempt++; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java new file mode 100644 index 0000000..0cbca37 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; +import com.google.common.base.Preconditions; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +/** + * Provides utilities for scheduling with the mesos offers + */ +public class SchedulerUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class); + + public static boolean isUniqueHostname(Protos.OfferOrBuilder offer, NodeTask taskToLaunch, Collection<NodeTask> tasks) { + Preconditions.checkArgument(offer != null); + String offerHostname = offer.getHostname(); + + if (!CollectionUtils.isEmpty(tasks)) { + for (NodeTask task : tasks) { + if (offerHostname.equalsIgnoreCase(task.getHostname())) { + LOGGER.debug("Offer's hostname {} is not unique", offerHostname); + return false; + } + } + } + LOGGER.debug("Offer's hostname {} is unique", offerHostname); + return true; + } + + /** + * Determines if a given host has a nodemanager running with zero profile. Node Managers + * launched with zero profile (zero cpu & memory) are eligible for fine grained scaling. + * Node Managers launched with a non-zero profile size are not eligible for fine grained scaling. + * + * @param hostName + * @return + */ + public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) { + for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) { + if (activeNMTask.getProfile().getCpus() == 0 && + activeNMTask.getProfile().getMemory() == 0 && + activeNMTask.getHostname().equals(hostName)) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java new file mode 100644 index 0000000..6fd8872 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceCommandLineGenerator.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.myriad.scheduler; + +import org.apache.myriad.configuration.MyriadConfiguration; + +/** + * CommandLineGenerator for any aux service launched by Myriad as binary distro + */ +public class ServiceCommandLineGenerator extends DownloadNMExecutorCLGenImpl { + + + public ServiceCommandLineGenerator(MyriadConfiguration cfg, String nodeManagerUri) { + super(cfg, nodeManagerUri); + } + + @Override + public String generateCommandLine(ServiceResourceProfile profile, Ports ports) { + StringBuilder strB = new StringBuilder(); + appendDistroExtractionCommands(strB); + appendUser(strB); + return strB.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java new file mode 100644 index 0000000..5319593 --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +/** + * Class to keep all the ServiceResourceProfiles together + */ +public class ServiceProfileManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProfileManager.class); + + private Map<String, ServiceResourceProfile> profiles = new ConcurrentHashMap<>(); + + public ServiceResourceProfile get(String name) { + return profiles.get(name); + } + + public void add(ServiceResourceProfile profile) { + LOGGER.info("Adding profile {} with CPU: {} and Memory: {}", profile.getName(), profile.getCpus(), profile.getMemory()); + profiles.put(profile.getName(), profile); + } + + public boolean exists(String name) { + return this.profiles.containsKey(name); + } + + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java new file mode 100644 index 0000000..021007b --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import java.lang.reflect.Type; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; + +/** + * Resource Profile for any service + */ +public class ServiceResourceProfile { + + protected final String name; + + /** + * Number of CPU needed to run a service + */ + protected final Double cpus; + + /** + * Memory in MB needed to run a service + */ + protected final Double memory; + + protected Double executorCpu = 0.0; + + protected Double executorMemory = 0.0; + + protected String className; + + public ServiceResourceProfile(String name, Double cpu, Double mem) { + this.name = name; + this.cpus = cpu; + this.memory = mem; + this.className = ServiceResourceProfile.class.getName(); + } + + + public String getName() { + return name; + } + + public Double getCpus() { + return cpus; + } + + public Double getMemory() { + return memory; + } + + public Double getAggregateMemory() { + return memory; + } + + public Double getAggregateCpu() { + return cpus; + } + + public Double getExecutorCpu() { + return executorCpu; + } + + public void setExecutorCpu(Double executorCpu) { + this.executorCpu = executorCpu; + } + + public Double getExecutorMemory() { + return executorMemory; + } + + public void setExecutorMemory(Double executorMemory) { + this.executorMemory = executorMemory; + } + + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } + + /** + * Custom serializer to help with deserialization of class hierarchy + * since at the point of deserialization we don't know class of the data + * that is being deserialized + */ + public static class CustomDeserializer implements JsonDeserializer<ServiceResourceProfile> { + private static final Logger LOGGER = LoggerFactory.getLogger(CustomDeserializer.class); + + @Override + public ServiceResourceProfile deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + String type = json.getAsJsonObject().get("className").getAsString(); + try { + @SuppressWarnings("rawtypes") Class c = Class.forName(type); + if (ServiceResourceProfile.class.equals(c)) { + return new Gson().fromJson(json, typeOfT); + } + ServiceResourceProfile profile = context.deserialize(json, c); + return profile; + } catch (ClassNotFoundException e) { + LOGGER.error("Classname is not found", e); + } + return null; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java new file mode 100644 index 0000000..544c47f --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.myriad.scheduler; + +import java.util.Map; + +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; + +/** + * ServiceTaskConstraints is an implementation of TaskConstraints for a service + * at this point constraints are on ports + * Later on there may be other types of constraints added + */ +public class ServiceTaskConstraints implements TaskConstraints { + + private int portsCount; + + public ServiceTaskConstraints(MyriadConfiguration cfg, String taskPrefix) { + this.portsCount = 0; + Map<String, ServiceConfiguration> auxConfigs = cfg.getServiceConfigurations(); + if (auxConfigs == null) { + return; + } + ServiceConfiguration serviceConfig = auxConfigs.get(taskPrefix); + if (serviceConfig != null) { + if (serviceConfig.getPorts().isPresent()) { + this.portsCount = serviceConfig.getPorts().get().size(); + } + } + } + + @Override + public int portsCount() { + return portsCount; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/101bcad3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java new file mode 100644 index 0000000..d7ca31d --- /dev/null +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.myriad.scheduler; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import javax.inject.Inject; + +import org.apache.mesos.Protos.CommandInfo; +import org.apache.mesos.Protos.ExecutorInfo; +import org.apache.mesos.Protos.FrameworkID; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.Resource; +import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Protos.Value; +import org.apache.mesos.Protos.CommandInfo.URI; +import org.apache.mesos.Protos.Value.Scalar; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.MyriadExecutorConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.apache.myriad.state.NodeTask; +import com.google.common.annotations.VisibleForTesting; + +/** + * Generic Service Class that allows to create a service solely base don the configuration + * Main properties of configuration are: + * 1. command to run + * 2. Additional env. variables to set (serviceOpts) + * 3. ports to use with names of the properties + * 4. TODO (yufeldman) executor info + */ +public class ServiceTaskFactoryImpl implements TaskFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(ServiceTaskFactoryImpl.class); + + public static final long DEFAULT_PORT_NUMBER = 0; + + private MyriadConfiguration cfg; + @SuppressWarnings("unused") + private TaskUtils taskUtils; + private ServiceCommandLineGenerator clGenerator; + + @Inject + public ServiceTaskFactoryImpl(MyriadConfiguration cfg, TaskUtils taskUtils) { + this.cfg = cfg; + this.taskUtils = taskUtils; + this.clGenerator = new ServiceCommandLineGenerator(cfg, cfg.getMyriadExecutorConfiguration().getNodeManagerUri().orNull()); + } + + @Override + public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, NodeTask nodeTask) { + Objects.requireNonNull(offer, "Offer should be non-null"); + Objects.requireNonNull(nodeTask, "NodeTask should be non-null"); + + ServiceConfiguration serviceConfig = cfg.getServiceConfiguration(nodeTask.getTaskPrefix()); + + Objects.requireNonNull(serviceConfig, "ServiceConfig should be non-null"); + Objects.requireNonNull(serviceConfig.getCommand().orNull(), "command for ServiceConfig should be non-null"); + + final String serviceHostName = "0.0.0.0"; + final String serviceEnv = serviceConfig.getEnvSettings(); + final String rmHostName = System.getProperty(YARN_RESOURCEMANAGER_HOSTNAME); + List<Long> additionalPortsNumbers = null; + + final StringBuilder strB = new StringBuilder("env "); + if (serviceConfig.getServiceOpts() != null) { + strB.append(serviceConfig.getServiceOpts()).append("="); + + strB.append("\""); + if (rmHostName != null && !rmHostName.isEmpty()) { + strB.append("-D" + YARN_RESOURCEMANAGER_HOSTNAME + "=" + rmHostName + " "); + } + + Map<String, Long> ports = serviceConfig.getPorts().orNull(); + if (ports != null && !ports.isEmpty()) { + int neededPortsCount = 0; + for (Map.Entry<String, Long> portEntry : ports.entrySet()) { + Long port = portEntry.getValue(); + if (port == DEFAULT_PORT_NUMBER) { + neededPortsCount++; + } + } + // use provided ports + additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount); + LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}", additionalPortsNumbers); + int index = 0; + for (Map.Entry<String, Long> portEntry : ports.entrySet()) { + String portProperty = portEntry.getKey(); + Long port = portEntry.getValue(); + if (port == DEFAULT_PORT_NUMBER) { + port = additionalPortsNumbers.get(index++); + } + strB.append("-D" + portProperty + "=" + serviceHostName + ":" + port + " "); + } + } + strB.append(serviceEnv); + strB.append("\""); + } + + strB.append(" "); + strB.append(serviceConfig.getCommand().get()); + + CommandInfo commandInfo = createCommandInfo(nodeTask.getProfile(), strB.toString()); + + LOGGER.info("Command line for service: {} is: {}", nodeTask.getTaskPrefix(), strB.toString()); + + Scalar taskMemory = Scalar.newBuilder().setValue(nodeTask.getProfile().getMemory()).build(); + Scalar taskCpus = Scalar.newBuilder().setValue(nodeTask.getProfile().getCpus()).build(); + + TaskInfo.Builder taskBuilder = TaskInfo.newBuilder(); + + taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources(Resource.newBuilder() + .setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build()); + + if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) { + // set ports + Value.Ranges.Builder valueRanger = Value.Ranges.newBuilder(); + for (Long port : additionalPortsNumbers) { + valueRanger.addRange(Value.Range.newBuilder().setBegin(port).setEnd(port)); + } + + taskBuilder.addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(valueRanger.build())); + } + taskBuilder.setCommand(commandInfo); + return taskBuilder.build(); + } + + @VisibleForTesting + CommandInfo createCommandInfo(ServiceResourceProfile profile, String executorCmd) { + MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); + CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); + Map<String, String> envVars = cfg.getYarnEnvironment(); + if (envVars != null && !envVars.isEmpty()) { + org.apache.mesos.Protos.Environment.Builder yarnHomeB = org.apache.mesos.Protos.Environment.newBuilder(); + for (Map.Entry<String, String> envEntry : envVars.entrySet()) { + org.apache.mesos.Protos.Environment.Variable.Builder yarnEnvB = org.apache.mesos.Protos.Environment.Variable.newBuilder(); + yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue()); + yarnHomeB.addVariables(yarnEnvB.build()); + } + commandInfo.mergeEnvironment(yarnHomeB.build()); + } + + if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { + //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. + if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { + throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!"); + } + + LOGGER.info("Using remote distribution"); + String clGeneratedCommand = clGenerator.generateCommandLine(profile, null); + + String nmURIString = myriadExecutorConfiguration.getNodeManagerUri().get(); + + //Concatenate all the subcommands + String cmd = clGeneratedCommand + " " + executorCmd; + + //get the nodemanagerURI + //We're going to extract ourselves, so setExtract is false + LOGGER.info("Getting Hadoop distribution from:" + nmURIString); + URI nmUri = URI.newBuilder().setValue(nmURIString).setExtract(false).build(); + + //get configs directly from resource manager + String configUrlString = clGenerator.getConfigurationUrl(); + LOGGER.info("Getting config from:" + configUrlString); + URI configUri = URI.newBuilder().setValue(configUrlString).build(); + + LOGGER.info("Slave will execute command:" + cmd); + commandInfo.addUris(nmUri).addUris(configUri).setValue("echo \"" + cmd + "\";" + cmd); + commandInfo.setUser(cfg.getFrameworkSuperUser().get()); + + } else { + commandInfo.setValue(executorCmd); + } + return commandInfo.build(); + } + + @Override + public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo commandInfo) { + // TODO (yufeldman) if executor specified use it , otherwise return null + // nothing to implement here, since we are using default slave executor + return null; + } + + /** + * Helper method to reserve ports + * + * @param offer + * @param requestedPorts + * @return + */ + private List<Long> getAvailablePorts(Offer offer, int requestedPorts) { + if (requestedPorts == 0) { + return null; + } + final List<Long> returnedPorts = new ArrayList<>(); + for (Resource resource : offer.getResourcesList()) { + if (resource.getName().equals("ports")) { + Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator(); + while (itr.hasNext()) { + Value.Range range = itr.next(); + if (range.getBegin() <= range.getEnd()) { + long i = range.getBegin(); + while (i <= range.getEnd() && returnedPorts.size() < requestedPorts) { + returnedPorts.add(i); + i++; + } + if (returnedPorts.size() >= requestedPorts) { + return returnedPorts; + } + } + } + } + } + // this is actually an error condition - we did not have enough ports to use + return returnedPorts; + } +}