Repository: incubator-twill Updated Branches: refs/heads/master d2d908172 -> 66402b4f2
(TWILL-153) Honor actual resource size of the container - Determine the -Xmx based on the actual size of the container - it can be smaller or bigger than the one requested in the TwillSpec - Refactor resource specification for AM - A forward looking change for TWILL-90 - Simple code cleanup to get rid of code warning from IDE. - Fix a easy to fail test - ZKClientTest.testExpireRewatch() This closes #63 in Github Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/66402b4f Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/66402b4f Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/66402b4f Branch: refs/heads/master Commit: 66402b4f234290194e6c9c8de3d7edf84aad22ff Parents: d2d9081 Author: Terence Yim <[email protected]> Authored: Tue Sep 22 14:35:26 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Wed Sep 23 22:34:45 2015 -0700 ---------------------------------------------------------------------- .../org/apache/twill/internal/Constants.java | 5 +- .../apache/twill/internal/ContainerInfo.java | 15 ++-- .../apache/twill/internal/ProcessLauncher.java | 2 +- .../twill/internal/ResourceCapability.java | 34 ++++++++ .../twill/internal/TwillContainerLauncher.java | 19 ++--- .../internal/json/TwillRunResourcesCodec.java | 14 ++-- .../apache/twill/internal/utils/Resources.java | 46 +++++++++++ .../internal/yarn/Hadoop20YarnAppClient.java | 23 ++++-- .../internal/yarn/Hadoop21YarnAppClient.java | 24 ++++-- .../appmaster/ApplicationMasterInfo.java | 63 +++++++++++++++ .../ApplicationMasterProcessLauncher.java | 24 ++---- .../appmaster/ApplicationMasterService.java | 24 ++++-- .../appmaster/ApplicationSubmitter.java | 3 +- .../yarn/AbstractYarnProcessLauncher.java | 3 +- .../twill/internal/yarn/YarnAppClient.java | 11 +-- .../apache/twill/yarn/YarnTwillPreparer.java | 14 ++-- .../apache/twill/yarn/ContainerSizeTestRun.java | 83 +++++++++++++++++++- .../apache/twill/zookeeper/ZKClientTest.java | 20 +++-- 18 files changed, 339 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java index defd013..f897bfa 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java +++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java @@ -37,14 +37,13 @@ public final class Constants { */ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000; + public static final double HEAP_MIN_RATIO = 0.7d; + /** Memory size of AM. */ public static final int APP_MASTER_MEMORY_MB = 512; public static final int APP_MASTER_RESERVED_MEMORY_MB = 150; - public static final String STDOUT = "stdout"; - public static final String STDERR = "stderr"; - public static final String CLASSPATH = "classpath"; public static final String APPLICATION_CLASSPATH = "application-classpath"; http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java index 67c21d3..5c93ede 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java @@ -22,15 +22,20 @@ import java.net.InetAddress; /** * Represents information of the container that the processing is/will be running in. */ -public interface ContainerInfo { +public interface ContainerInfo extends ResourceCapability { + /** + * Returns the ID of the container. + */ String getId(); + /** + * Returns the host information of the container. + */ InetAddress getHost(); + /** + * Returns the port for communicating to the container host. + */ int getPort(); - - int getMemoryMB(); - - int getVirtualCores(); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java index d0f289b..beb8e6b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java @@ -26,7 +26,7 @@ import java.util.Map; * * @param <T> Type of the object that contains information about the container that the process is going to launch. */ -public interface ProcessLauncher<T> { +public interface ProcessLauncher<T extends ResourceCapability> { /** * Returns information about the container that this launch would launch process in. http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java new file mode 100644 index 0000000..2cdd080 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal; + +/** + * Represents information about compute resources capability. + */ +public interface ResourceCapability { + + /** + * Returns memory size in MB. + */ + int getMemoryMB(); + + /** + * Returns the number of virtual cpu cores. + */ + int getVirtualCores(); +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 0667bb4..d01db4b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -27,6 +27,7 @@ import org.apache.twill.api.RunId; import org.apache.twill.api.RuntimeSpecification; import org.apache.twill.filesystem.Location; import org.apache.twill.internal.state.Message; +import org.apache.twill.internal.utils.Resources; import org.apache.twill.launcher.FindFreePort; import org.apache.twill.launcher.TwillLauncher; import org.apache.twill.zookeeper.NodeData; @@ -46,9 +47,8 @@ public final class TwillContainerLauncher { private static final Logger LOG = LoggerFactory.getLogger(TwillContainerLauncher.class); - private static final double HEAP_MIN_RATIO = 0.7d; - private final RuntimeSpecification runtimeSpec; + private final ContainerInfo containerInfo; private final ProcessLauncher.PrepareLaunchContext launchContext; private final ZKClient zkClient; private final int instanceCount; @@ -56,10 +56,12 @@ public final class TwillContainerLauncher { private final int reservedMemory; private final Location secureStoreLocation; - public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ProcessLauncher.PrepareLaunchContext launchContext, + public TwillContainerLauncher(RuntimeSpecification runtimeSpec, ContainerInfo containerInfo, + ProcessLauncher.PrepareLaunchContext launchContext, ZKClient zkClient, int instanceCount, JvmOptions jvmOpts, int reservedMemory, Location secureStoreLocation) { this.runtimeSpec = runtimeSpec; + this.containerInfo = containerInfo; this.launchContext = launchContext; this.zkClient = zkClient; this.instanceCount = instanceCount; @@ -98,15 +100,6 @@ public final class TwillContainerLauncher { LOG.warn("Failed to launch container with secure store {}.", secureStoreLocation); } - int memory = runtimeSpec.getResourceSpecification().getMemorySize(); - if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) { - // Reduce -Xmx by the reserved memory size. - memory = runtimeSpec.getResourceSpecification().getMemorySize() - reservedMemory; - } else { - // If it is a small VM, just discount it by the min ratio. - memory = (int) Math.ceil(memory * HEAP_MIN_RATIO); - } - // Currently no reporting is supported for runnable containers launchContext .addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId()) @@ -135,6 +128,8 @@ public final class TwillContainerLauncher { } else { firstCommand = "$JAVA_HOME/bin/java"; } + + int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, Constants.HEAP_MIN_RATIO); commandBuilder.add("-Djava.io.tmpdir=tmp", "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID, "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME, http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java index 9a6f555..7dea371 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java @@ -35,13 +35,13 @@ import java.lang.reflect.Type; */ public final class TwillRunResourcesCodec implements JsonSerializer<TwillRunResources>, JsonDeserializer<TwillRunResources> { - private final String CONTAINER_ID = "containerId"; - private final String INSTANCE_ID = "instanceId"; - private final String HOST = "host"; - private final String MEMORY_MB = "memoryMB"; - private final String VIRTUAL_CORES = "virtualCores"; - private final String DEBUG_PORT = "debugPort"; - private final String LOG_LEVEL = "logLevel"; + private static final String CONTAINER_ID = "containerId"; + private static final String INSTANCE_ID = "instanceId"; + private static final String HOST = "host"; + private static final String MEMORY_MB = "memoryMB"; + private static final String VIRTUAL_CORES = "virtualCores"; + private static final String DEBUG_PORT = "debugPort"; + private static final String LOG_LEVEL = "logLevel"; @Override public JsonElement serialize(TwillRunResources src, Type typeOfSrc, JsonSerializationContext context) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java new file mode 100644 index 0000000..2ebb789 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.utils; + +/** + * Utility class to help adjusting container resource requirement. + */ +public final class Resources { + + /** + * Computes the max heap size for a JVM process. + * + * @param containerMemory memory in MB of the container memory. + * It is the maximum memory size allowed for the process. + * @param nonHeapMemory memory in MB that needs to be reserved for non JVM heap memory for the process. + * @param minHeapRatio minimum ratio for heap to non-heap memory. + * @return memory in MB representing the max heap size for a JVM process. + */ + public static int computeMaxHeapSize(int containerMemory, int nonHeapMemory, double minHeapRatio) { + if (((double) (containerMemory - nonHeapMemory) / containerMemory) >= minHeapRatio) { + // Reduce -Xmx by the reserved memory size. + return containerMemory - nonHeapMemory; + } else { + // If it is a small VM, just discount it by the min ratio. + return (int) Math.ceil(containerMemory * minHeapRatio); + } + } + + private Resources() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java index 3afa49a..bfa494c 100644 --- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java +++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java @@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.util.Records; import org.apache.twill.api.TwillSpecification; +import org.apache.twill.internal.Constants; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher; import org.apache.twill.internal.appmaster.ApplicationSubmitter; import org.slf4j.Logger; @@ -70,8 +72,8 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Request for new application final GetNewApplicationResponse response = yarnClient.getNewApplication(); final ApplicationId appId = response.getApplicationId(); @@ -86,10 +88,17 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements appSubmissionContext.setQueue(schedulerQueue); } + // TODO: Make it adjustable through TwillSpec (TWILL-90) + // Set the resource requirement for AM + Resource amResource = Records.newRecord(Resource.class); + amResource.setMemory(Constants.APP_MASTER_MEMORY_MB); + final Resource capability = adjustMemory(response, amResource); + ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(), 1); + ApplicationSubmitter submitter = new ApplicationSubmitter() { @Override - public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability) { + public ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext) { ContainerLaunchContext context = launchContext.getLaunchContext(); addRMToken(context); context.setUser(appSubmissionContext.getUser()); @@ -106,7 +115,7 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } }; - return new ApplicationMasterProcessLauncher(appId, submitter); + return new ApplicationMasterProcessLauncher(appMasterInfo, submitter); } private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) { @@ -158,9 +167,9 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(String user, - TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { this.user = user; return createLauncher(twillSpec, schedulerQueue); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java index 046e3f1..6fd11e5 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java @@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.twill.api.TwillSpecification; +import org.apache.twill.internal.Constants; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher; import org.apache.twill.internal.appmaster.ApplicationSubmitter; import org.slf4j.Logger; @@ -64,8 +66,8 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Request for new application YarnClientApplication application = yarnClient.createApplication(); final GetNewApplicationResponse response = application.getNewApplicationResponse(); @@ -80,14 +82,20 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements appSubmissionContext.setQueue(schedulerQueue); } + // TODO: Make it adjustable through TwillSpec (TWILL-90) + // Set the resource requirement for AM + final Resource capability = adjustMemory(response, Resource.newInstance(Constants.APP_MASTER_MEMORY_MB, 1)); + ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, capability.getMemory(), + capability.getVirtualCores()); + ApplicationSubmitter submitter = new ApplicationSubmitter() { @Override - public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context, Resource capability) { + public ProcessController<YarnApplicationReport> submit(YarnLaunchContext context) { ContainerLaunchContext launchContext = context.getLaunchContext(); addRMToken(launchContext); appSubmissionContext.setAMContainerSpec(launchContext); - appSubmissionContext.setResource(adjustMemory(response, capability)); + appSubmissionContext.setResource(capability); appSubmissionContext.setMaxAppAttempts(2); try { @@ -100,7 +108,7 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } }; - return new ApplicationMasterProcessLauncher(appId, submitter); + return new ApplicationMasterProcessLauncher(appMasterInfo, submitter); } private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) { @@ -139,9 +147,9 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements } @Override - public ProcessLauncher<ApplicationId> createLauncher(String user, - TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception { + public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception { // Ignore user return createLauncher(twillSpec, schedulerQueue); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java new file mode 100644 index 0000000..fbeeca0 --- /dev/null +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.appmaster; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.twill.internal.ResourceCapability; + +/** + * Represents information of the application master. + */ +public class ApplicationMasterInfo implements ResourceCapability { + + private final ApplicationId appId; + private final int memoryMB; + private final int virtualCores; + + public ApplicationMasterInfo(ApplicationId appId, int memoryMB, int virtualCores) { + this.appId = appId; + this.memoryMB = memoryMB; + this.virtualCores = virtualCores; + } + + /** + * Returns the application ID for the YARN application. + */ + public ApplicationId getAppId() { + return appId; + } + + @Override + public int getMemoryMB() { + return memoryMB; + } + + @Override + public int getVirtualCores() { + return virtualCores; + } + + @Override + public String toString() { + return "ApplicationMasterInfo{" + + "appId=" + appId + + ", memoryMB=" + memoryMB + + ", virtualCores=" + virtualCores + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java index 126ff97..da11816 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java @@ -19,38 +19,30 @@ package org.apache.twill.internal.appmaster; import com.google.common.collect.ImmutableMap; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.util.Records; -import org.apache.twill.internal.Constants; import org.apache.twill.internal.EnvKeys; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher; import org.apache.twill.internal.yarn.YarnLaunchContext; -import org.apache.twill.internal.yarn.YarnUtils; import java.util.Map; /** * A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client. */ -public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> { +public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationMasterInfo> { private final ApplicationSubmitter submitter; - public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) { - super(appId); + public ApplicationMasterProcessLauncher(ApplicationMasterInfo info, ApplicationSubmitter submitter) { + super(info); this.submitter = submitter; } @Override @SuppressWarnings("unchecked") protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) { - final ApplicationId appId = getContainerInfo(); - - // Set the resource requirement for AM - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(Constants.APP_MASTER_MEMORY_MB); - YarnUtils.setVirtualCores(capability, 1); + ApplicationMasterInfo appMasterInfo = getContainerInfo(); + ApplicationId appId = appMasterInfo.getAppId(); // Put in extra environments Map<String, String> env = ImmutableMap.<String, String>builder() @@ -58,11 +50,11 @@ public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessL .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId())) .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp())) .put(EnvKeys.YARN_APP_ID_STR, appId.toString()) - .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB)) - .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability))) + .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(appMasterInfo.getMemoryMB())) + .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(appMasterInfo.getVirtualCores())) .build(); launchContext.setEnvironment(env); - return (ProcessController<R>) submitter.submit(launchContext, capability); + return (ProcessController<R>) submitter.submit(launchContext); } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 818db05..355cea3 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -176,10 +176,14 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } @SuppressWarnings("unchecked") + @Nullable private EventHandler createEventHandler(TwillSpecification twillSpec) { try { // Should be able to load by this class ClassLoader, as they packaged in the same jar. EventHandlerSpecification handlerSpec = twillSpec.getEventHandler(); + if (handlerSpec == null) { + return null; + } Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName()); Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass), @@ -221,7 +225,9 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec)); // initialize the event handler, if it fails, it will fail the application. - eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler())); + if (eventHandler != null) { + eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler())); + } instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger")); @@ -237,11 +243,13 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec)); - try { - // call event handler destroy. If there is error, only log and not affected stop sequence. - eventHandler.destroy(); - } catch (Throwable t) { - LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t); + if (eventHandler != null) { + try { + // call event handler destroy. If there is error, only log and not affected stop sequence. + eventHandler.destroy(); + } catch (Throwable t) { + LOG.warn("Exception when calling {}.destroy()", eventHandler.getClass().getName(), t); + } } instanceChangeExecutor.shutdownNow(); @@ -491,7 +499,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } } - if (!timeoutEvents.isEmpty()) { + if (!timeoutEvents.isEmpty() && eventHandler != null) { try { EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents); if (action.getTimeout() < 0) { @@ -640,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp ); TwillContainerLauncher launcher = new TwillContainerLauncher( - twillSpec.getRunnables().get(runnableName), launchContext, + twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext, ZKClients.namespace(zkClient, getZKNamespace(runnableName)), containerCount, jvmOpts, reservedMemory, getSecureStoreLocation()); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java index 38f90ae..e82dbbc 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java @@ -17,7 +17,6 @@ */ package org.apache.twill.internal.appmaster; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.yarn.YarnApplicationReport; import org.apache.twill.internal.yarn.YarnLaunchContext; @@ -27,5 +26,5 @@ import org.apache.twill.internal.yarn.YarnLaunchContext; */ public interface ApplicationSubmitter { - ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability); + ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java index 2023b0e..eb917f3 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.twill.api.LocalFile; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.ResourceCapability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ import java.util.Map; * * @param <T> Type of the object that contains information about the container that the process is going to launch. */ -public abstract class AbstractYarnProcessLauncher<T> implements ProcessLauncher<T> { +public abstract class AbstractYarnProcessLauncher<T extends ResourceCapability> implements ProcessLauncher<T> { private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnProcessLauncher.class); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java index 67a2292..df956bf 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.twill.api.TwillSpecification; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import java.util.List; import javax.annotation.Nullable; @@ -36,8 +37,8 @@ public interface YarnAppClient extends Service { * Creates a {@link ProcessLauncher} for launching the application represented by the given spec. If scheduler queue * is available and is supported by the YARN cluster, it will be launched in the given queue. */ - ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception; + ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception; /** * Creates a {@link ProcessLauncher} for launching application with the given user and spec. If scheduler queue @@ -46,9 +47,9 @@ public interface YarnAppClient extends Service { * @deprecated This method will get removed. */ @Deprecated - ProcessLauncher<ApplicationId> createLauncher(String user, - TwillSpecification twillSpec, - @Nullable String schedulerQueue) throws Exception; + ProcessLauncher<ApplicationMasterInfo> createLauncher(String user, + TwillSpecification twillSpec, + @Nullable String schedulerQueue) throws Exception; /** * Creates a {@link ProcessController} that can controls an application represented by the given application id. http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index 4e9f76d..a444dda 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -40,7 +40,6 @@ import com.google.gson.GsonBuilder; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.twill.api.ClassAcceptor; import org.apache.twill.api.EventHandlerSpecification; @@ -68,6 +67,7 @@ import org.apache.twill.internal.LogOnlyEventHandler; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; import org.apache.twill.internal.RunIds; +import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterMain; import org.apache.twill.internal.container.TwillContainerMain; import org.apache.twill.internal.json.ArgumentsCodec; @@ -76,6 +76,7 @@ import org.apache.twill.internal.json.LocalFileCodec; import org.apache.twill.internal.json.TwillSpecificationAdapter; import org.apache.twill.internal.utils.Dependencies; import org.apache.twill.internal.utils.Paths; +import org.apache.twill.internal.utils.Resources; import org.apache.twill.internal.yarn.YarnAppClient; import org.apache.twill.internal.yarn.YarnApplicationReport; import org.apache.twill.internal.yarn.YarnUtils; @@ -283,8 +284,8 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillController start() { try { - final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue); - final ApplicationId appId = launcher.getContainerInfo(); + final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue); + final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo(); Callable<ProcessController<YarnApplicationReport>> submitTask = new Callable<ProcessController<YarnApplicationReport>>() { @Override @@ -310,7 +311,7 @@ final class YarnTwillPreparer implements TwillPreparer { Constants.Files.LAUNCHER_JAR, Constants.Files.ARGUMENTS)); - LOG.debug("Submit AM container spec: {}", appId); + LOG.debug("Submit AM container spec: {}", appMasterInfo); // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory // org.apache.twill.internal.TwillLauncher // appMaster.jar @@ -329,6 +330,9 @@ final class YarnTwillPreparer implements TwillPreparer { LOG.debug("Log level is set to {} for the Twill application.", logLevel); builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString()); } + + int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), + Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO); return launcher.prepareLaunch(builder.build(), localFiles.values(), credentials) .addCommand( "$JAVA_HOME/bin/java", @@ -336,7 +340,7 @@ final class YarnTwillPreparer implements TwillPreparer { "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR, "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR", - "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m", + "-Xmx" + memory + "m", extraOptions == null ? "" : extraOptions, TwillLauncher.class.getName(), Constants.Files.APP_MASTER_JAR, http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java index 6e27b69..c6f7b9a 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java @@ -29,6 +29,8 @@ import org.apache.twill.api.logging.PrinterLogHandler; import org.apache.twill.discovery.ServiceDiscovered; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.PrintWriter; import java.util.concurrent.ExecutionException; @@ -36,11 +38,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * Test for requesting different container size in different order. - * It specifically test for workaround for YARN-314. + * Tests related to different container sizes. */ public class ContainerSizeTestRun extends BaseYarnTest { + /** + * Test for requesting different container size in different order. + * It specifically test for workaround for YARN-314. + */ @Test public void testContainerSize() throws InterruptedException, TimeoutException, ExecutionException { TwillRunner runner = getTwillRunner(); @@ -56,6 +61,20 @@ public class ContainerSizeTestRun extends BaseYarnTest { } } + @Test + public void testMaxHeapSize() throws InterruptedException, TimeoutException, ExecutionException { + TwillRunner runner = getTwillRunner(); + TwillController controller = runner.prepare(new MaxHeapApp()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .start(); + + try { + ServiceDiscovered discovered = controller.discoverService("sleep"); + Assert.assertTrue(waitForSize(discovered, 1, 120)); + } finally { + controller.terminate().get(120, TimeUnit.SECONDS); + } + } /** * An application that has two runnables with different memory size. @@ -86,7 +105,6 @@ public class ContainerSizeTestRun extends BaseYarnTest { } } - /** * A runnable that sleep for 120 seconds. */ @@ -116,4 +134,63 @@ public class ContainerSizeTestRun extends BaseYarnTest { } } } + + /** + * An application for testing max heap size. + */ + public static final class MaxHeapApp implements TwillApplication { + + @Override + public TwillSpecification configure() { + // Make the runnable request for container smaller than 128MB (the allocation minimum) + ResourceSpecification res = ResourceSpecification.Builder.with() + .setVirtualCores(1) + .setMemory(16, ResourceSpecification.SizeUnit.MEGA) + .build(); + + return TwillSpecification.Builder.with() + .setName("MaxHeapApp") + .withRunnable() + .add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles() + .anyOrder() + .build(); + } + } + + /** + * The runnable for testing max heap size. + */ + public static final class MaxHeapRunnable extends AbstractTwillRunnable { + + private static final Logger LOG = LoggerFactory.getLogger(MaxHeapRunnable.class); + private volatile Thread runThread; + + public MaxHeapRunnable(int port) { + super(ImmutableMap.of("port", Integer.toString(port))); + } + + @Override + public void run() { + // This heap size should be > 16, since the min allocation size is 128mb + if (Runtime.getRuntime().maxMemory() <= 16 * 1024 * 1024) { + LOG.error("Memory size is too small: {}", Runtime.getRuntime().maxMemory()); + return; + } + + runThread = Thread.currentThread(); + getContext().announce("sleep", Integer.parseInt(getContext().getSpecification().getConfigs().get("port"))); + try { + TimeUnit.SECONDS.sleep(120); + } catch (InterruptedException e) { + // Ignore. + } + } + + @Override + public void stop() { + if (runThread != null) { + runThread.interrupt(); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java ---------------------------------------------------------------------- diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java index 162d4db..a9120c3 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java @@ -30,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.junit.Assert; import org.junit.ClassRule; @@ -188,12 +189,21 @@ public class ZKClientTest { client.startAndWait(); try { - final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<Watcher.Event.EventType>(); + final BlockingQueue<Watcher.Event.EventType> events = new LinkedBlockingQueue<>(); client.exists("/expireRewatch", new Watcher() { @Override - public void process(WatchedEvent event) { - client.exists("/expireRewatch", this); - events.add(event.getType()); + public void process(final WatchedEvent event) { + Futures.addCallback(client.exists("/expireRewatch", this), new FutureCallback<Stat>() { + @Override + public void onSuccess(Stat result) { + events.add(event.getType()); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Failed to call exists on /expireRewatch", t); + } + }); } }); @@ -309,7 +319,7 @@ public class ZKClientTest { Assert.assertEquals("digest", acl.getId().getScheme()); Assert.assertEquals(digest, acl.getId().getId()); - Assert.assertEquals("test", new String(noAuthClient.getData(path).get().getData())); + Assert.assertArrayEquals("test".getBytes(), noAuthClient.getData(path).get().getData()); // When tries to write using the no-auth zk client, it should fail. try {
