Repository: incubator-twill Updated Branches: refs/heads/feature/TWILL-153 [created] 053174408
(TWILL-153) Honor actual resource size of the container - Determine the -Xmx based on the actual size of the container - it can be smaller or bigger than the one requested in the TwillSpec - Refactor resource specification for AM - A forward looking change for TWILL-90 - Simple code cleanup to get rid of code warning from IDE. Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/05317440 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/05317440 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/05317440 Branch: refs/heads/feature/TWILL-153 Commit: 0531744083c1aa772e7e56fe4d177b0be09f29cd Parents: d2d9081 Author: Terence Yim <[email protected]> Authored: Tue Sep 22 14:35:26 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Tue Sep 22 16:03:32 2015 -0700 ---------------------------------------------------------------------- .../org/apache/twill/internal/Constants.java | 5 +- .../apache/twill/internal/ContainerInfo.java | 15 +++-- .../apache/twill/internal/ProcessLauncher.java | 2 +- .../twill/internal/ResourceCapability.java | 34 +++++++++++ .../twill/internal/TwillContainerLauncher.java | 19 +++--- .../internal/json/TwillRunResourcesCodec.java | 14 ++--- .../apache/twill/internal/utils/Resources.java | 46 ++++++++++++++ .../internal/yarn/Hadoop20YarnAppClient.java | 23 ++++--- .../internal/yarn/Hadoop21YarnAppClient.java | 24 +++++--- .../appmaster/ApplicationMasterInfo.java | 63 ++++++++++++++++++++ .../ApplicationMasterProcessLauncher.java | 24 +++----- .../appmaster/ApplicationMasterService.java | 24 +++++--- .../appmaster/ApplicationSubmitter.java | 3 +- .../yarn/AbstractYarnProcessLauncher.java | 3 +- .../twill/internal/yarn/YarnAppClient.java | 11 ++-- .../apache/twill/yarn/YarnTwillPreparer.java | 13 ++-- .../org/apache/twill/yarn/BaseYarnTest.java | 5 +- .../apache/twill/yarn/ContainerSizeTestRun.java | 52 +++++++++++++++- .../java/org/apache/twill/yarn/TwillTester.java | 22 +++++++ 19 files changed, 320 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java index defd013..f897bfa 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java +++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java @@ -37,14 +37,13 @@ public final class Constants { */ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000; + public static final double HEAP_MIN_RATIO = 0.7d; + /** Memory size of AM. */ public static final int APP_MASTER_MEMORY_MB = 512; public static final int APP_MASTER_RESERVED_MEMORY_MB = 150; - public static final String STDOUT = "stdout"; - public static final String STDERR = "stderr"; - public static final String CLASSPATH = "classpath"; public static final String APPLICATION_CLASSPATH = "application-classpath"; http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java index 67c21d3..5c93ede 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java @@ -22,15 +22,20 @@ import java.net.InetAddress; /** * Represents information of the container that the processing is/will be running in. */ -public interface ContainerInfo { +public interface ContainerInfo extends ResourceCapability { + /** + * Returns the ID of the container. + */ String getId(); + /** + * Returns the host information of the container. + */ InetAddress getHost(); + /** + * Returns the port for communicating to the container host. + */ int getPort(); - - int getMemoryMB(); - - int getVirtualCores(); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java index d0f289b..beb8e6b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java @@ -26,7 +26,7 @@ import java.util.Map; * * @param <T> Type of the object that contains information about the container that the process is going to launch. */ -public interface ProcessLauncher<T> { +public interface ProcessLauncher<T extends ResourceCapability> { /** * Returns information about the container that this launch would launch process in. http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java new file mode 100644 index 0000000..2cdd080 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal; + +/** + * Represents information about compute resources capability. + */ +public interface ResourceCapability { + + /** + * Returns memory size in MB. + */ + int getMemoryMB(); + + /** + * Returns the number of virtual cpu cores. + */ + int getVirtualCores(); +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 0667bb4..dc749ad 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -27,6 +27,7 @@ import org.apache.twill.api.RunId; import org.apache.twill.api.RuntimeSpecification; import org.apache.twill.filesystem.Location; import org.apache.twill.internal.state.Message; +import org.apache.twill.internal.utils.Resources; import org.apache.twill.launcher.FindFreePort; import org.apache.twill.launcher.TwillLauncher; import org.apache.twill.zookeeper.NodeData; @@ -46,9 +47,8 @@ public final class TwillContainerLauncher { private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class); - private static final double HEAP_MIN_RATIO = 0.7d; - private final RuntimeSpecification runtimeSpec; + private final ContainerInfo containerInfo; private final ProcessLauncher.PrepareLaunchContext launchContext; private final ZKClient zkClient; private final int instanceCount; @@ -56,10 +56,12 @@ public final class TwillContainerLauncher { private final int reservedMemory; private final Location secureStoreLocation; - public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext, + public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo, + ProcessLauncher.PrepareLaunchContext launchContext, ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory, Location secureStoreLocation) { this.runtimeSpec = runtimeSpec; + this.containerInfo = containerInfo; this.launchContext = launchContext; this.zkClient = zkClient; this.instanceCount = instanceCount; @@ -98,15 +100,6 @@ public final class TwillContainerLauncher { LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation); } - int memory = runtimeSpec.getResourceSpecification().getMemorySize(); - if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) { - // Reduce -Xmx by the reserved memory size. - memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory; - } else { - // If it is a small VM, just discount it by the min ratio. - memory = (int) Math.ceil(memory * HEAP_MIN_RATIO); - } - // Currently no reporting is supported for runnable containers launchContext .addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId()) @@ -135,6 +128,8 @@ public final class TwillContainerLauncher { } else { firstCommand = "$JAVA_HOME/bin/java"; } + + int memory = Resources.computeMaxHeap(containerInfo.getMemoryMB(), reservedMemory, Constants.HEAP_MIN_RATIO); commandBuilder.add("-Djava.io.tmpdir=tmp", "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID, "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME, http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java index 9a6f555..7dea371 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java @@ -35,13 +35,13 @@ import java.lang.reflect.Type; */ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>, JsonDeserializer<TwillRunResources> { - private final String CONTAINER_ID = "containerId"; - private final String INSTANCE_ID = "instanceId"; - private final String HOST = "host"; - private final String MEMORY_MB = "memoryMB"; - private final String VIRTUAL_CORES = "virtualCores"; - private final String DEBUG_PORT = "debugPort"; - private final String LOG_LEVEL = "logLevel"; + private static final String CONTAINER_ID = "containerId"; + private static final String INSTANCE_ID = "instanceId"; + private static final String HOST = "host"; + private static final String MEMORY_MB = "memoryMB"; + private static final String VIRTUAL_CORES = "virtualCores"; + private static final String DEBUG_PORT = "debugPort"; + private static final String LOG_LEVEL = "logLevel"; @Override public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java new file mode 100644 index 0000000..5b1f2c6 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.utils; + +/** + * Utility class to help adjusting container resource requirement. + */ +public final class Resources { + + /** + * Computes the max heap size for a JVM process. + * + * @param containerMemory memory in MB of the container memory memory. + * It is the maximum memory size allowed for the process. + * @param nonHeapMemory memory in MB that needs to be reserved for non JVM heap memory for the process. + * @param minHeapRatio minimum ratio for heap to non-heap memory. + * @return memory in MB representing the max heap size for a JVM process. + */ + public static int computeMaxHeap(int containerMemory, int nonHeapMemory, double minHeapRatio) { + if (((double) (containerMemory - nonHeapMemory) / containerMemory) >= minHeapRatio) { + // Reduce -Xmx by the reserved memory size. + return containerMemory - nonHeapMemory; + } else { + // If it is a small VM, just discount it by the min ratio. + return (int) Math.ceil(containerMemory * minHeapRatio); + } + } + + private Resources() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java index 3afa49a..bfa494c 100644 --- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java +++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java @@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.util.Records; import org.apache.twill.api.TwillSpecification; +import org.apache.twill.internal.Constants; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher; import org.apache.twill.internal.appmaster.ApplicationSubmitter; import org.slf4j.Logger; @@ -70,8 +72,8 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Request for new application final GetNewApplicationResponse response = yarnClient.getNewApplication(); final ApplicationId appId = response.getApplicationId(); @@ -86,10 +88,17 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements appSubmissionContext.setQueue(schedulerQueue); } + // TODO: Make it adjustable through TwillSpec (TWILL-90) + // Set the resource requirement for AM + Resource amResource = Records.newRecord(Resource.class); + amResource.setMemory(Constants.APP_MASTER_MEMORY_MB); + final Resource capability = adjustMemory(response, amResource); + ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(), 1); + ApplicationSubmitter submitter = new ApplicationSubmitter() { @Override - public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) { + public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext) { ContainerLaunchContext context = launchContext.getLaunchContext(); addRMToken(context); context.setUser(appSubmissionContext.getUser()); @@ -106,7 +115,7 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } }; - return new ApplicationMasterProcessLauncher(appId, submitter); + return new ApplicationMasterProcessLauncher(appMasterInfo, submitter); } private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) { @@ -158,9 +167,9 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(String user, - TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { this.user = user; return createLauncher(twillSpec, schedulerQueue); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java index 046e3f1..6fd11e5 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java @@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.twill.api.TwillSpecification; +import org.apache.twill.internal.Constants; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher; import org.apache.twill.internal.appmaster.ApplicationSubmitter; import org.slf4j.Logger; @@ -64,8 +66,8 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Request for new application YarnClientApplication application = yarnClient.createApplication(); final GetNewApplicationResponse response = application.getNewApplicationResponse(); @@ -80,14 +82,20 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements appSubmissionContext.setQueue(schedulerQueue); } + // TODO: Make it adjustable through TwillSpec (TWILL-90) + // Set the resource requirement for AM + final Resource capability = adjustMemory(response, Resource.newInstance(Constants.APP_MASTER_MEMORY_MB, 1)); + ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(), + capability.getVirtualCores()); + ApplicationSubmitter submitter = new ApplicationSubmitter() { @Override - public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) { + public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context) { ContainerLaunchContext launchContext = context.getLaunchContext(); addRMToken(launchContext); appSubmissionContext.setAMContainerSpec(launchContext); - appSubmissionContext.setResource(adjustMemory(response, capability)); + appSubmissionContext.setResource(capability); appSubmissionContext.setMaxAppAttempts(2); try { @@ -100,7 +108,7 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } }; - return new ApplicationMasterProcessLauncher(appId, submitter); + return new ApplicationMasterProcessLauncher(appMasterInfo, submitter); } private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) { @@ -139,9 +147,9 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(String user, - TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Ignore user return createLauncher(twillSpec, schedulerQueue); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java new file mode 100644 index 0000000..fbeeca0 --- /dev/null +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.appmaster; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.twill.internal.ResourceCapability; + +/** + * Represents information of the application master. + */ +public class ApplicationMasterInfo implements ResourceCapability { + + private final ApplicationId appId; + private final int memoryMB; + private final int virtualCores; + + public ApplicationMasterInfo(ApplicationId appId, int memoryMB, int virtualCores) { + this.appId = appId; + this.memoryMB = memoryMB; + this.virtualCores = virtualCores; + } + + /** + * Returns the application ID for the YARN application. + */ + public ApplicationId getAppId() { + return appId; + } + + @Override + public int getMemoryMB() { + return memoryMB; + } + + @Override + public int getVirtualCores() { + return virtualCores; + } + + @Override + public String toString() { + return "ApplicationMasterInfo{" + + "appId=" + appId + + ", memoryMB=" + memoryMB + + ", virtualCores=" + virtualCores + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java index 126ff97..da11816 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java @@ -19,38 +19,30 @@ package org.apache.twill.internal.appmaster; import com.google.common.collect.ImmutableMap; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Records; -import org.apache.twill.internal.Constants; import org.apache.twill.internal.EnvKeys; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher; import org.apache.twill.internal.yarn.YarnLaunchContext; -import org.apache.twill.internal.yarn.YarnUtils; import java.util.Map; /** * A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client. */ -public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> { +public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationMasterInfo> { private final ApplicationSubmitter submitter; - public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) { - super(appId); + public ApplicationMasterProcessLauncher(ApplicationMasterInfo info, ApplicationSubmitter submitter) { + super(info); this.submitter = submitter; } @Override @SuppressWarnings("unchecked") protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) { - final ApplicationId appId = getContainerInfo(); - - // Set the resource requirement for AM - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(Constants.APP_MASTER_MEMORY_MB); - YarnUtils.setVirtualCores(capability, 1); + ApplicationMasterInfo appMasterInfo = getContainerInfo(); + ApplicationId appId = appMasterInfo.getAppId(); // Put in extra environments Map<String, String> env = ImmutableMap.<String, String>builder() @@ -58,11 +50,11 @@ public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessL .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId())) .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp())) .put(EnvKeys.YARN_APP_ID_STR, appId.toString()) - .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB)) - .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability))) + .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(appMasterInfo.getMemoryMB())) + .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(appMasterInfo.getVirtualCores())) .build(); launchContext.setEnvironment(env); - return (ProcessController<R>) submitter.submit(launchContext, capability); + return (ProcessController<R>) submitter.submit(launchContext); } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 818db05..355cea3 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -176,10 +176,14 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } @SuppressWarnings("unchecked") + @Nullable private EventHandler createEventHandler(TwillSpecification twillSpec) { try { // Should be able to load by this class ClassLoader, as they packaged in the same jar. EventHandlerSpecification handlerSpec = twillSpec.getEventHandler(); + if (handlerSpec == null) { + return null; + } Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName()); Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass), @@ -221,7 +225,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec)); // initialize the event handler, if it fails, it will fail the application. - eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler())); + if (eventHandler != null) { + eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler())); + } instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger")); @@ -237,11 +243,13 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec)); - try { - // call event handler destroy. If there is error, only log and not affected stop sequence. - eventHandler.destroy(); - } catch (Throwable t) { - LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t); + if (eventHandler != null) { + try { + // call event handler destroy. If there is error, only log and not affected stop sequence. + eventHandler.destroy(); + } catch (Throwable t) { + LOG.warn("Exception when calling {}.destroy()", eventHandler.getClass().getName(), t); + } } instanceChangeExecutor.shutdownNow(); @@ -491,7 +499,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } } - if (!timeoutEvents.isEmpty()) { + if (!timeoutEvents.isEmpty() && eventHandler != null) { try { EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents); if (action.getTimeout() < 0) { @@ -640,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp ); TwillContainerLauncher launcher = new TwillContainerLauncher( - twillSpec.getRunnables().get(runnableName), launchContext, + twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext, ZKClients.namespace(zkClient, getZKNamespace(runnableName)), containerCount, jvmOpts, reservedMemory, getSecureStoreLocation()); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java index 38f90ae..e82dbbc 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.appmaster; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.yarn.YarnApplicationReport; import org.apache.twill.internal.yarn.YarnLaunchContext; @@ -27,5 +26,5 @@ import org.apache.twill.internal.yarn.YarnLaunchContext; */ public interface ApplicationSubmitter { - ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability); + ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java index 2023b0e..eb917f3 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.twill.api.LocalFile; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.ResourceCapability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ import java.util.Map; * * @param <T> Type of the object that contains information about the container that the process is going to launch. */ -public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> { +public abstract class AbstractYarnProcessLauncher<T extends ResourceCapability> implements ProcessLauncher<T> { private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java index 67a2292..df956bf 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.twill.api.TwillSpecification; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import java.util.List; import javax.annotation.Nullable; @@ -36,8 +37,8 @@ public interface YarnAppClient extends Service { * Creates a {@link ProcessLauncher} for launching the application represented by the given spec. If scheduler queue * is available and is supported by the YARN cluster, it will be launched in the given queue. */ - ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception; + ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception; /** * Creates a {@link ProcessLauncher} for launching application with the given user and spec. If scheduler queue @@ -46,9 +47,9 @@ public interface YarnAppClient extends Service { * @deprecated This method will get removed. */ @Deprecated - ProcessLauncher<ApplicationId> createLauncher(String user, - TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception; + ProcessLauncher<ApplicationMasterInfo> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception; /** * Creates a {@link ProcessController} that can controls an application represented by the given application id. http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index 4e9f76d..d871e9b 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -68,6 +68,7 @@ import org.apache.twill.internal.LogOnlyEventHandler; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; import org.apache.twill.internal.RunIds; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterMain; import org.apache.twill.internal.container.TwillContainerMain; import org.apache.twill.internal.json.ArgumentsCodec; @@ -76,6 +77,7 @@ import org.apache.twill.internal.json.LocalFileCodec; import org.apache.twill.internal.json.TwillSpecificationAdapter; import org.apache.twill.internal.utils.Dependencies; import org.apache.twill.internal.utils.Paths; +import org.apache.twill.internal.utils.Resources; import org.apache.twill.internal.yarn.YarnAppClient; import org.apache.twill.internal.yarn.YarnApplicationReport; import org.apache.twill.internal.yarn.YarnUtils; @@ -283,8 +285,8 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillController start() { try { - final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue); - final ApplicationId appId = launcher.getContainerInfo(); + final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue); + final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo(); Callable<ProcessController<YarnApplicationReport>> submitTask = new Callable<ProcessController<YarnApplicationReport>>() { @Override @@ -310,7 +312,7 @@ final class YarnTwillPreparer implements TwillPreparer { Constants.Files.LAUNCHER_JAR, Constants.Files.ARGUMENTS)); - LOG.debug("Submit AM container spec: {}", appId); + LOG.debug("Submit AM container spec: {}", appMasterInfo); // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory // org.apache.twill.internal.TwillLauncher // appMaster.jar @@ -329,6 +331,9 @@ final class YarnTwillPreparer implements TwillPreparer { LOG.debug("Log level is set to {} for the Twill application.", logLevel); builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString()); } + + int memory = Resources.computeMaxHeap(appMasterInfo.getMemoryMB(), + Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO); return launcher.prepareLaunch(builder.build(), localFiles.values(), credentials) .addCommand( "$JAVA_HOME/bin/java", @@ -336,7 +341,7 @@ final class YarnTwillPreparer implements TwillPreparer { "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR, "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR", - "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m", + "-Xmx" + memory + "m", extraOptions == null ? "" : extraOptions, TwillLauncher.class.getName(), Constants.Files.APP_MASTER_JAR, http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java index b5c7f58..476dd77 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java @@ -17,6 +17,7 @@ */ package org.apache.twill.yarn; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.twill.api.TwillController; @@ -45,7 +46,9 @@ public abstract class BaseYarnTest { * A singleton wrapper so that yarn cluster only bring up once across all tests in the YarnTestSuite. */ @ClassRule - public static final TwillTester TWILL_TESTER = new TwillTester() { + public static final TwillTester TWILL_TESTER = new TwillTester(ImmutableMap.of( + "yarn.scheduler.maximum-allocation-mb", "2048" + )) { private final AtomicInteger instances = new AtomicInteger(); @Override http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java index 6e27b69..5789889 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java @@ -29,6 +29,8 @@ import org.apache.twill.api.logging.PrinterLogHandler; import org.apache.twill.discovery.ServiceDiscovered; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.PrintWriter; import java.util.concurrent.ExecutionException; @@ -36,11 +38,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * Test for requesting different container size in different order. - * It specifically test for workaround for YARN-314. + * Tests related to different container sizes. */ public class ContainerSizeTestRun extends BaseYarnTest { + /** + * Test for requesting different container size in different order. + * It specifically test for workaround for YARN-314. + */ @Test public void testContainerSize() throws InterruptedException, TimeoutException, ExecutionException { TwillRunner runner = getTwillRunner(); @@ -56,6 +61,20 @@ public class ContainerSizeTestRun extends BaseYarnTest { } } + @Test + public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException { + TwillRunner runner = getTwillRunner(); + TwillController controller = runner.prepare(new MaxHeapApp()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .start(); + + try { + ServiceDiscovered discovered = controller.discoverService("sleep"); + Assert.assertTrue(waitForSize(discovered, 1, 120)); + } finally { + controller.terminate().get(120, TimeUnit.SECONDS); + } + } /** * An application that has two runnables with different memory size. @@ -86,12 +105,34 @@ public class ContainerSizeTestRun extends BaseYarnTest { } } + /** + * An application for testing max heap size. + */ + public static final class MaxHeapApp implements TwillApplication { + + @Override + public TwillSpecification configure() { + ResourceSpecification res = ResourceSpecification.Builder.with() + .setVirtualCores(1) + .setMemory(8, ResourceSpecification.SizeUnit.GIGA) + .build(); + + return TwillSpecification.Builder.with() + .setName("MaxHeapApp") + .withRunnable() + .add("sleep", new SleepRunnable(12345), res).noLocalFiles() + .anyOrder() + .build(); + } + } + /** * A runnable that sleep for 120 seconds. */ public static final class SleepRunnable extends AbstractTwillRunnable { + private static final Logger LOG = LoggerFactory.getLogger(SleepRunnable.class); private volatile Thread runThread; public SleepRunnable(int port) { @@ -100,6 +141,13 @@ public class ContainerSizeTestRun extends BaseYarnTest { @Override public void run() { + // Verify that the heap memory is smaller than the container memory + int maxHeap = (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024); + if (getContext().getMaxMemoryMB() <= maxHeap) { + LOG.error("Max heap size is >= container max memory size"); + return; + } + runThread = Thread.currentThread(); getContext().announce("sleep", Integer.parseInt(getContext().getSpecification().getConfigs().get("port"))); try { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/05317440/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java index f669b83..2cdf7d2 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java @@ -37,7 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; /** * A TwillTester rule allows creation of mini Yarn cluster and {@link TwillRunner} used for testing that is @@ -61,6 +63,8 @@ public class TwillTester extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(TwillTester.class); private final TemporaryFolder tmpFolder = new TemporaryFolder(); + private final Map<String, String> configurations; + private InMemoryZKServer zkServer; private MiniDFSCluster dfsCluster; private MiniYARNCluster cluster; @@ -68,6 +72,20 @@ public class TwillTester extends ExternalResource { private TwillRunnerService twillRunner; private YarnAppClient yarnAppClient; + /** + * Creates a new instance. + */ + public TwillTester() { + this(Collections.<String, String>emptyMap()); + } + + /** + * Creates a new instance with a set of configurations for the mini YARN cluster. + */ + public TwillTester(Map<String, String> configurations) { + this.configurations = configurations; + } + @Override protected void before() throws Throwable { tmpFolder.create(); @@ -100,6 +118,10 @@ public class TwillTester extends ExternalResource { conf.set("yarn.scheduler.minimum-allocation-mb", "128"); conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600"); + for (Map.Entry<String, String> entry : configurations.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + cluster = new MiniYARNCluster("test-cluster", 3, 1, 1); cluster.init(conf); cluster.start();
