Repository: incubator-slider Updated Branches: refs/heads/develop 8d319482b -> 6218b2d0e
SLIDER-594 enable container launch delay option Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/6218b2d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/6218b2d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/6218b2d0 Branch: refs/heads/develop Commit: 6218b2d0ed0691e82335c8bebbcc6c2fc14d4dd3 Parents: 8d31948 Author: Jon Maron <jma...@hortonworks.com> Authored: Mon Nov 3 17:34:54 2014 -0500 Committer: Jon Maron <jma...@hortonworks.com> Committed: Mon Nov 3 17:34:54 2014 -0500 ---------------------------------------------------------------------- .../slider/providers/agent/AgentKeys.java | 1 + .../server/appmaster/RoleLaunchService.java | 25 ++- .../appmaster/TestDelayInContainerLaunch.groovy | 173 +++++++++++++++++++ 3 files changed, 196 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6218b2d0/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java index 9d5eb3d..e682b13 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java @@ -99,6 +99,7 @@ public interface AgentKeys { String AGENT_OUT_FILE = "slider-agent.out"; String KEY_AGENT_TWO_WAY_SSL_ENABLED = "ssl.server.client.auth"; String CERT_FILE_LOCALIZATION_PATH = "certs/ca.crt"; + String KEY_CONTAINER_LAUNCH_DELAY = "container.launch.delay.sec"; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6218b2d0/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java index 37824c8..9264991 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java @@ -22,12 +22,14 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.launch.ContainerLauncher; import org.apache.slider.providers.ProviderRole; import org.apache.slider.providers.ProviderService; +import org.apache.slider.providers.agent.AgentKeys; import org.apache.slider.server.appmaster.actions.ActionStartContainer; import org.apache.slider.server.appmaster.actions.QueueAccess; import org.apache.slider.server.appmaster.state.RoleInstance; @@ -213,9 +215,26 @@ public class RoleLaunchService instance.role = containerRole; instance.roleId = role.id; instance.environment = envDescription; - actionQueue.put(new ActionStartContainer("starting " + containerRole, - container, containerLauncher.completeContainerLaunch(), instance, 0, - TimeUnit.MILLISECONDS)); + int delay = appComponent.getOptionInt( + AgentKeys.KEY_CONTAINER_LAUNCH_DELAY, 0); + int maxDelay = + getConfig().getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS); + if (delay > maxDelay/1000) { + log.warn("Container launch delay of {} exceeds the maximum allowed of" + + " {} seconds. Delay will not be utilized.", + delay, maxDelay/1000); + delay = 0; + } + log.info("Container launch delay for {} set to {} seconds", + role.name, delay); + actionQueue.schedule(new ActionStartContainer("starting " + + containerRole, + container, + containerLauncher.completeContainerLaunch(), + instance, + delay, + TimeUnit.SECONDS)); } catch (Exception e) { log.error("Exception thrown while trying to start {}: {}", containerRole, e); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6218b2d0/slider-core/src/test/groovy/org/apache/slider/server/appmaster/TestDelayInContainerLaunch.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/TestDelayInContainerLaunch.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/TestDelayInContainerLaunch.groovy new file mode 100644 index 0000000..973d3af --- /dev/null +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/TestDelayInContainerLaunch.groovy @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import org.apache.hadoop.fs.Path +import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.hadoop.yarn.api.records.ApplicationReport +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.YarnException +import org.apache.slider.api.ClusterDescription +import org.apache.slider.api.ResourceKeys +import org.apache.slider.client.SliderClient +import org.apache.slider.common.SliderExitCodes +import org.apache.slider.common.params.ActionKillContainerArgs +import org.apache.slider.core.build.InstanceBuilder +import org.apache.slider.core.conf.AggregateConf +import org.apache.slider.core.exceptions.BadClusterStateException +import org.apache.slider.core.exceptions.SliderException +import org.apache.slider.core.launch.LaunchedApplication +import org.apache.slider.core.main.ServiceLauncher +import org.apache.slider.core.persist.LockAcquireFailedException +import org.apache.slider.providers.agent.AgentKeys +import org.apache.slider.providers.agent.AgentTestBase +import org.junit.Before +import org.junit.Test + +import static org.apache.slider.common.params.Arguments.* +import static org.apache.slider.providers.agent.AgentKeys.* + +/** + * Tests an echo command + */ +@Slf4j +class TestDelayInContainerLaunch extends AgentTestBase { + + File slider_core + String echo_py + File echo_py_path + File app_def_path + String agt_ver + File agt_ver_path + String agt_conf + File agt_conf_path + + @Before + public void setupArtifacts() { + slider_core = new File(new File(".").absoluteFile, "src/test/python"); + echo_py = "echo.py" + echo_py_path = new File(slider_core, echo_py) + app_def_path = new File(app_def_pkg_path) + agt_ver = "version" + agt_ver_path = new File(slider_core, agt_ver) + agt_conf = "agent.ini" + agt_conf_path = new File(slider_core, agt_conf) + + } + + @Override + void checkTestAssumptions(YarnConfiguration conf) { + + } + + @Test + public void testDelayInContainerLaunch() throws Throwable { + String clustername = createMiniCluster("", + configuration, + 1, + 1, + 1, + true, + false) + + assert echo_py_path.exists() + assert app_def_path.exists() + assert agt_ver_path.exists() + assert agt_conf_path.exists() + + def role = "echo" + Map<String, Integer> roles = [ + (role): 1, + ]; + long delay = 30 + + TestDelayingSliderClient.delay = delay + setSliderClientClassName(TestDelayingSliderClient.name) + try { + ServiceLauncher<SliderClient> launcher = buildAgentCluster(clustername, + roles, + [ + ARG_OPTION, PACKAGE_PATH, slider_core.absolutePath, + ARG_OPTION, APP_DEF, toURIArg(app_def_path), + ARG_OPTION, AGENT_CONF, toURIArg(agt_conf_path), + ARG_OPTION, AGENT_VERSION, toURIArg(agt_ver_path), + ARG_RES_COMP_OPT, role, ResourceKeys.COMPONENT_PRIORITY, "1", + ARG_COMP_OPT, role, SCRIPT_PATH, echo_py, + ARG_COMP_OPT, role, SERVICE_NAME, "Agent", + ], + true, true, + true) + SliderClient sliderClient = launcher.service + waitForRoleCount(sliderClient, roles, AGENT_CLUSTER_STARTUP_TIME) + + ClusterDescription status = sliderClient.clusterDescription + def workers = status.instances["echo"] + assert workers.size() == 1 + def worker1 = workers[0] + + // set the delay for 10 seconds more than start duration + ActionKillContainerArgs args = new ActionKillContainerArgs(); + args.id = worker1 + long start = System.currentTimeMillis() + assert 0 == sliderClient.actionKillContainer(clustername, args) + sleep(5000) + waitForRoleCount(sliderClient, roles, AGENT_CLUSTER_STARTUP_TIME) + long duration = System.currentTimeMillis() - start + assert duration/1000 >= delay + + } finally { + setSliderClientClassName(SliderClient.name) + } + + + } + + static class TestDelayingSliderClient extends SliderClient { + static long delay + @Override + protected void persistInstanceDefinition(boolean overwrite, + Path appconfdir, + InstanceBuilder builder) + throws IOException, SliderException, LockAcquireFailedException { + AggregateConf conf = builder.getInstanceDescription() + conf.getAppConfOperations().getGlobalOptions().put( + AgentKeys.KEY_CONTAINER_LAUNCH_DELAY, + String.valueOf(delay)) + super.persistInstanceDefinition(overwrite, appconfdir, builder) + } + + @Override + LaunchedApplication launchApplication(String clustername, + Path clusterDirectory, + AggregateConf instanceDefinition, + boolean debugAM) + throws YarnException, IOException { + instanceDefinition.getAppConfOperations().getGlobalOptions().put( + AgentKeys.KEY_CONTAINER_LAUNCH_DELAY, + String.valueOf(delay)) + return super.launchApplication(clustername, clusterDirectory, instanceDefinition, debugAM) + } + + public static void setDelay (long aDelay) { + delay = aDelay + } + } +}