Repository: incubator-twill
Updated Branches:
  refs/heads/master d2d908172 -> 66402b4f2


(TWILL-153) Honor actual resource size of the container

- Determine the -Xmx based on the actual size of the container
  - it can be smaller or bigger than the one requested in the TwillSpec
- Refactor resource specification for AM
  - A forward looking change for TWILL-90
- Simple code cleanup to get rid of code warning from IDE.
- Fix a easy to fail test - ZKClientTest.testExpireRewatch()

This closes #63 in Github

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/66402b4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/66402b4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/66402b4f

Branch: refs/heads/master
Commit: 66402b4f234290194e6c9c8de3d7edf84aad22ff
Parents: d2d9081
Author: Terence Yim <[email protected]>
Authored: Tue Sep 22 14:35:26 2015 -0700
Committer: Terence Yim <[email protected]>
Committed: Wed Sep 23 22:34:45 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/internal/Constants.java    |  5 +-
 .../apache/twill/internal/ContainerInfo.java    | 15 ++--
 .../apache/twill/internal/ProcessLauncher.java  |  2 +-
 .../twill/internal/ResourceCapability.java      | 34 ++++++++
 .../twill/internal/TwillContainerLauncher.java  | 19 ++---
 .../internal/json/TwillRunResourcesCodec.java   | 14 ++--
 .../apache/twill/internal/utils/Resources.java  | 46 +++++++++++
 .../internal/yarn/Hadoop20YarnAppClient.java    | 23 ++++--
 .../internal/yarn/Hadoop21YarnAppClient.java    | 24 ++++--
 .../appmaster/ApplicationMasterInfo.java        | 63 +++++++++++++++
 .../ApplicationMasterProcessLauncher.java       | 24 ++----
 .../appmaster/ApplicationMasterService.java     | 24 ++++--
 .../appmaster/ApplicationSubmitter.java         |  3 +-
 .../yarn/AbstractYarnProcessLauncher.java       |  3 +-
 .../twill/internal/yarn/YarnAppClient.java      | 11 +--
 .../apache/twill/yarn/YarnTwillPreparer.java    | 14 ++--
 .../apache/twill/yarn/ContainerSizeTestRun.java | 83 +++++++++++++++++++-
 .../apache/twill/zookeeper/ZKClientTest.java    | 20 +++--
 18 files changed, 339 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java 
b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index defd013..f897bfa 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -37,14 +37,13 @@ public final class Constants {
    */
   public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
 
+  public static final double HEAP_MIN_RATIO = 0.7d;
+
   /** Memory size of AM. */
   public static final int APP_MASTER_MEMORY_MB = 512;
 
   public static final int APP_MASTER_RESERVED_MEMORY_MB = 150;
 
-  public static final String STDOUT = "stdout";
-  public static final String STDERR = "stderr";
-
   public static final String CLASSPATH = "classpath";
   public static final String APPLICATION_CLASSPATH = "application-classpath";
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java 
b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
index 67c21d3..5c93ede 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ContainerInfo.java
@@ -22,15 +22,20 @@ import java.net.InetAddress;
 /**
  * Represents information of the container that the processing is/will be 
running in.
  */
-public interface ContainerInfo {
+public interface ContainerInfo extends ResourceCapability {
 
+  /**
+   * Returns the ID of the container.
+   */
   String getId();
 
+  /**
+   * Returns the host information of the container.
+   */
   InetAddress getHost();
 
+  /**
+   * Returns the port for communicating to the container host.
+   */
   int getPort();
-
-  int getMemoryMB();
-
-  int getVirtualCores();
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java 
b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
index d0f289b..beb8e6b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ProcessLauncher.java
@@ -26,7 +26,7 @@ import java.util.Map;
  *
  * @param <T> Type of the object that contains information about the container 
that the process is going to launch.
  */
-public interface ProcessLauncher<T> {
+public interface ProcessLauncher<T extends ResourceCapability> {
 
   /**
    * Returns information about the container that this launch would launch 
process in.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java 
b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
new file mode 100644
index 0000000..2cdd080
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/ResourceCapability.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+/**
+ * Represents information about compute resources capability.
+ */
+public interface ResourceCapability {
+
+  /**
+   * Returns memory size in MB.
+   */
+  int getMemoryMB();
+
+  /**
+   * Returns the number of virtual cpu cores.
+   */
+  int getVirtualCores();
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 0667bb4..d01db4b 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -27,6 +27,7 @@ import org.apache.twill.api.RunId;
 import org.apache.twill.api.RuntimeSpecification;
 import org.apache.twill.filesystem.Location;
 import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.utils.Resources;
 import org.apache.twill.launcher.FindFreePort;
 import org.apache.twill.launcher.TwillLauncher;
 import org.apache.twill.zookeeper.NodeData;
@@ -46,9 +47,8 @@ public final class TwillContainerLauncher {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TwillContainerLauncher.class);
 
-  private static final double HEAP_MIN_RATIO = 0.7d;
-
   private final RuntimeSpecification runtimeSpec;
+  private final ContainerInfo containerInfo;
   private final ProcessLauncher.PrepareLaunchContext launchContext;
   private final ZKClient zkClient;
   private final int instanceCount;
@@ -56,10 +56,12 @@ public final class TwillContainerLauncher {
   private final int reservedMemory;
   private final Location secureStoreLocation;
 
-  public TwillContainerLauncher(RuntimeSpecification runtimeSpec, 
ProcessLauncher.PrepareLaunchContext launchContext,
+  public TwillContainerLauncher(RuntimeSpecification runtimeSpec, 
ContainerInfo containerInfo,
+                                ProcessLauncher.PrepareLaunchContext 
launchContext,
                                 ZKClient zkClient, int instanceCount, 
JvmOptions jvmOpts, int reservedMemory,
                                 Location secureStoreLocation) {
     this.runtimeSpec = runtimeSpec;
+    this.containerInfo = containerInfo;
     this.launchContext = launchContext;
     this.zkClient = zkClient;
     this.instanceCount = instanceCount;
@@ -98,15 +100,6 @@ public final class TwillContainerLauncher {
       LOG.warn("Failed to launch container with secure store {}.", 
secureStoreLocation);
     }
 
-    int memory = runtimeSpec.getResourceSpecification().getMemorySize();
-    if (((double) (memory - reservedMemory) / memory) >= HEAP_MIN_RATIO) {
-      // Reduce -Xmx by the reserved memory size.
-      memory = runtimeSpec.getResourceSpecification().getMemorySize() - 
reservedMemory;
-    } else {
-      // If it is a small VM, just discount it by the min ratio.
-      memory = (int) Math.ceil(memory * HEAP_MIN_RATIO);
-    }
-
     // Currently no reporting is supported for runnable containers
     launchContext
       .addEnvironment(EnvKeys.TWILL_RUN_ID, runId.getId())
@@ -135,6 +128,8 @@ public final class TwillContainerLauncher {
     } else {
       firstCommand = "$JAVA_HOME/bin/java";
     }
+
+    int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), 
reservedMemory, Constants.HEAP_MIN_RATIO);
     commandBuilder.add("-Djava.io.tmpdir=tmp",
                        "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
                        "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + 
EnvKeys.TWILL_RUNNABLE_NAME,

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
index 9a6f555..7dea371 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRunResourcesCodec.java
@@ -35,13 +35,13 @@ import java.lang.reflect.Type;
  */
 public final class TwillRunResourcesCodec implements 
JsonSerializer<TwillRunResources>,
                                               
JsonDeserializer<TwillRunResources> {
-  private final String CONTAINER_ID = "containerId";
-  private final String INSTANCE_ID = "instanceId";
-  private final String HOST = "host";
-  private final String MEMORY_MB = "memoryMB";
-  private final String VIRTUAL_CORES = "virtualCores";
-  private final String DEBUG_PORT = "debugPort";
-  private final String LOG_LEVEL = "logLevel";
+  private static final String CONTAINER_ID = "containerId";
+  private static final String INSTANCE_ID = "instanceId";
+  private static final String HOST = "host";
+  private static final String MEMORY_MB = "memoryMB";
+  private static final String VIRTUAL_CORES = "virtualCores";
+  private static final String DEBUG_PORT = "debugPort";
+  private static final String LOG_LEVEL = "logLevel";
 
   @Override
   public JsonElement serialize(TwillRunResources src, Type typeOfSrc, 
JsonSerializationContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java 
b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
new file mode 100644
index 0000000..2ebb789
--- /dev/null
+++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Resources.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.utils;
+
+/**
+ * Utility class to help adjusting container resource requirement.
+ */
+public final class Resources {
+
+  /**
+   * Computes the max heap size for a JVM process.
+   *
+   * @param containerMemory memory in MB of the container memory.
+   *                        It is the maximum memory size allowed for the 
process.
+   * @param nonHeapMemory memory in MB that needs to be reserved for non JVM 
heap memory for the process.
+   * @param minHeapRatio minimum ratio for heap to non-heap memory.
+   * @return memory in MB representing the max heap size for a JVM process.
+   */
+  public static int computeMaxHeapSize(int containerMemory, int nonHeapMemory, 
double minHeapRatio) {
+    if (((double) (containerMemory - nonHeapMemory) / containerMemory) >= 
minHeapRatio) {
+      // Reduce -Xmx by the reserved memory size.
+      return containerMemory - nonHeapMemory;
+    } else {
+      // If it is a small VM, just discount it by the min ratio.
+      return (int) Math.ceil(containerMemory * minHeapRatio);
+    }
+  }
+
+  private Resources() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 3afa49a..bfa494c 100644
--- 
a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ 
b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
 import org.apache.twill.internal.appmaster.ApplicationSubmitter;
 import org.slf4j.Logger;
@@ -70,8 +72,8 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification 
twillSpec,
-                                                       @Nullable String 
schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> 
createLauncher(TwillSpecification twillSpec,
+                                                               @Nullable 
String schedulerQueue) throws Exception {
     // Request for new application
     final GetNewApplicationResponse response = yarnClient.getNewApplication();
     final ApplicationId appId = response.getApplicationId();
@@ -86,10 +88,17 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
       appSubmissionContext.setQueue(schedulerQueue);
     }
 
+    // TODO: Make it adjustable through TwillSpec (TWILL-90)
+    // Set the resource requirement for AM
+    Resource amResource = Records.newRecord(Resource.class);
+    amResource.setMemory(Constants.APP_MASTER_MEMORY_MB);
+    final Resource capability = adjustMemory(response, amResource);
+    ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, 
capability.getMemory(), 1);
+
     ApplicationSubmitter submitter = new ApplicationSubmitter() {
 
       @Override
-      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
launchContext, Resource capability) {
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
launchContext) {
         ContainerLaunchContext context = launchContext.getLaunchContext();
         addRMToken(context);
         context.setUser(appSubmissionContext.getUser());
@@ -106,7 +115,7 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
       }
     };
 
-    return new ApplicationMasterProcessLauncher(appId, submitter);
+    return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
   }
 
   private Resource adjustMemory(GetNewApplicationResponse response, Resource 
capability) {
@@ -158,9 +167,9 @@ public final class Hadoop20YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user,
-                                                       TwillSpecification 
twillSpec,
-                                                       @Nullable String 
schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+                                                               
TwillSpecification twillSpec,
+                                                               @Nullable 
String schedulerQueue) throws Exception {
     this.user = user;
     return createLauncher(twillSpec, schedulerQueue);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
 
b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 046e3f1..6fd11e5 100644
--- 
a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ 
b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterProcessLauncher;
 import org.apache.twill.internal.appmaster.ApplicationSubmitter;
 import org.slf4j.Logger;
@@ -64,8 +66,8 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(TwillSpecification 
twillSpec,
-                                                       @Nullable String 
schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> 
createLauncher(TwillSpecification twillSpec,
+                                                               @Nullable 
String schedulerQueue) throws Exception {
     // Request for new application
     YarnClientApplication application = yarnClient.createApplication();
     final GetNewApplicationResponse response = 
application.getNewApplicationResponse();
@@ -80,14 +82,20 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
       appSubmissionContext.setQueue(schedulerQueue);
     }
 
+    // TODO: Make it adjustable through TwillSpec (TWILL-90)
+    // Set the resource requirement for AM
+    final Resource capability = adjustMemory(response, 
Resource.newInstance(Constants.APP_MASTER_MEMORY_MB, 1));
+    ApplicationMasterInfo appMasterInfo = new ApplicationMasterInfo(appId, 
capability.getMemory(),
+                                                                    
capability.getVirtualCores());
+
     ApplicationSubmitter submitter = new ApplicationSubmitter() {
       @Override
-      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
context, Resource capability) {
+      public ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
context) {
         ContainerLaunchContext launchContext = context.getLaunchContext();
 
         addRMToken(launchContext);
         appSubmissionContext.setAMContainerSpec(launchContext);
-        appSubmissionContext.setResource(adjustMemory(response, capability));
+        appSubmissionContext.setResource(capability);
         appSubmissionContext.setMaxAppAttempts(2);
 
         try {
@@ -100,7 +108,7 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
       }
     };
 
-    return new ApplicationMasterProcessLauncher(appId, submitter);
+    return new ApplicationMasterProcessLauncher(appMasterInfo, submitter);
   }
 
   private Resource adjustMemory(GetNewApplicationResponse response, Resource 
capability) {
@@ -139,9 +147,9 @@ public final class Hadoop21YarnAppClient extends 
AbstractIdleService implements
   }
 
   @Override
-  public ProcessLauncher<ApplicationId> createLauncher(String user,
-                                                       TwillSpecification 
twillSpec,
-                                                       @Nullable String 
schedulerQueue) throws Exception {
+  public ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+                                                               
TwillSpecification twillSpec,
+                                                               @Nullable 
String schedulerQueue) throws Exception {
     // Ignore user
     return createLauncher(twillSpec, schedulerQueue);
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
new file mode 100644
index 0000000..fbeeca0
--- /dev/null
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.twill.internal.ResourceCapability;
+
+/**
+ * Represents information of the application master.
+ */
+public class ApplicationMasterInfo implements ResourceCapability {
+
+  private final ApplicationId appId;
+  private final int memoryMB;
+  private final int virtualCores;
+
+  public ApplicationMasterInfo(ApplicationId appId, int memoryMB, int 
virtualCores) {
+    this.appId = appId;
+    this.memoryMB = memoryMB;
+    this.virtualCores = virtualCores;
+  }
+
+  /**
+   * Returns the application ID for the YARN application.
+   */
+  public ApplicationId getAppId() {
+    return appId;
+  }
+
+  @Override
+  public int getMemoryMB() {
+    return memoryMB;
+  }
+
+  @Override
+  public int getVirtualCores() {
+    return virtualCores;
+  }
+
+  @Override
+  public String toString() {
+    return "ApplicationMasterInfo{" +
+      "appId=" + appId +
+      ", memoryMB=" + memoryMB +
+      ", virtualCores=" + virtualCores +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
index 126ff97..da11816 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
@@ -19,38 +19,30 @@ package org.apache.twill.internal.appmaster;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
 import org.apache.twill.internal.yarn.YarnLaunchContext;
-import org.apache.twill.internal.yarn.YarnUtils;
 
 import java.util.Map;
 
 /**
  * A {@link org.apache.twill.internal.ProcessLauncher} for launching 
Application Master from the client.
  */
-public final class ApplicationMasterProcessLauncher extends 
AbstractYarnProcessLauncher<ApplicationId> {
+public final class ApplicationMasterProcessLauncher extends 
AbstractYarnProcessLauncher<ApplicationMasterInfo> {
 
   private final ApplicationSubmitter submitter;
 
-  public ApplicationMasterProcessLauncher(ApplicationId appId, 
ApplicationSubmitter submitter) {
-    super(appId);
+  public ApplicationMasterProcessLauncher(ApplicationMasterInfo info, 
ApplicationSubmitter submitter) {
+    super(info);
     this.submitter = submitter;
   }
 
   @Override
   @SuppressWarnings("unchecked")
   protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) 
{
-    final ApplicationId appId = getContainerInfo();
-
-    // Set the resource requirement for AM
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(Constants.APP_MASTER_MEMORY_MB);
-    YarnUtils.setVirtualCores(capability, 1);
+    ApplicationMasterInfo appMasterInfo = getContainerInfo();
+    ApplicationId appId = appMasterInfo.getAppId();
 
     // Put in extra environments
     Map<String, String> env = ImmutableMap.<String, String>builder()
@@ -58,11 +50,11 @@ public final class ApplicationMasterProcessLauncher extends 
AbstractYarnProcessL
       .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId()))
       .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, 
Long.toString(appId.getClusterTimestamp()))
       .put(EnvKeys.YARN_APP_ID_STR, appId.toString())
-      .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, 
Integer.toString(Constants.APP_MASTER_MEMORY_MB))
-      .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, 
Integer.toString(YarnUtils.getVirtualCores(capability)))
+      .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, 
Integer.toString(appMasterInfo.getMemoryMB()))
+      .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, 
Integer.toString(appMasterInfo.getVirtualCores()))
       .build();
 
     launchContext.setEnvironment(env);
-    return (ProcessController<R>) submitter.submit(launchContext, capability);
+    return (ProcessController<R>) submitter.submit(launchContext);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 818db05..355cea3 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -176,10 +176,14 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   }
 
   @SuppressWarnings("unchecked")
+  @Nullable
   private EventHandler createEventHandler(TwillSpecification twillSpec) {
     try {
       // Should be able to load by this class ClassLoader, as they packaged in 
the same jar.
       EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+      if (handlerSpec == null) {
+        return null;
+      }
 
       Class<?> handlerClass = 
getClass().getClassLoader().loadClass(handlerSpec.getClassName());
       
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
@@ -221,7 +225,9 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     LOG.info("Start application master with spec: " + 
TwillSpecificationAdapter.create().toJson(twillSpec));
 
     // initialize the event handler, if it fails, it will fail the application.
-    eventHandler.initialize(new 
BasicEventHandlerContext(twillSpec.getEventHandler()));
+    if (eventHandler != null) {
+      eventHandler.initialize(new 
BasicEventHandlerContext(twillSpec.getEventHandler()));
+    }
 
     instanceChangeExecutor = 
Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
@@ -237,11 +243,13 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
 
     LOG.info("Stop application master with spec: {}", 
TwillSpecificationAdapter.create().toJson(twillSpec));
 
-    try {
-      // call event handler destroy. If there is error, only log and not 
affected stop sequence.
-      eventHandler.destroy();
-    } catch (Throwable t) {
-      LOG.warn("Exception when calling {}.destroy()", 
twillSpec.getEventHandler().getClassName(), t);
+    if (eventHandler != null) {
+      try {
+        // call event handler destroy. If there is error, only log and not 
affected stop sequence.
+        eventHandler.destroy();
+      } catch (Throwable t) {
+        LOG.warn("Exception when calling {}.destroy()", 
eventHandler.getClass().getName(), t);
+      }
     }
 
     instanceChangeExecutor.shutdownNow();
@@ -491,7 +499,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       }
     }
 
-    if (!timeoutEvents.isEmpty()) {
+    if (!timeoutEvents.isEmpty() && eventHandler != null) {
       try {
         EventHandler.TimeoutAction action = 
eventHandler.launchTimeout(timeoutEvents);
         if (action.getTimeout() < 0) {
@@ -640,7 +648,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       );
 
       TwillContainerLauncher launcher = new TwillContainerLauncher(
-        twillSpec.getRunnables().get(runnableName), launchContext,
+        twillSpec.getRunnables().get(runnableName), 
processLauncher.getContainerInfo(), launchContext,
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
         containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
index 38f90ae..e82dbbc 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
@@ -17,7 +17,6 @@
  */
 package org.apache.twill.internal.appmaster;
 
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
 import org.apache.twill.internal.yarn.YarnLaunchContext;
@@ -27,5 +26,5 @@ import org.apache.twill.internal.yarn.YarnLaunchContext;
  */
 public interface ApplicationSubmitter {
 
-  ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
launchContext, Resource capability);
+  ProcessController<YarnApplicationReport> submit(YarnLaunchContext 
launchContext);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
index 2023b0e..eb917f3 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnProcessLauncher.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.twill.api.LocalFile;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.ResourceCapability;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,7 @@ import java.util.Map;
  *
  * @param <T> Type of the object that contains information about the container 
that the process is going to launch.
  */
-public abstract class AbstractYarnProcessLauncher<T> implements 
ProcessLauncher<T> {
+public abstract class AbstractYarnProcessLauncher<T extends 
ResourceCapability> implements ProcessLauncher<T> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractYarnProcessLauncher.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java 
b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
index 67a2292..df956bf 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.twill.api.TwillSpecification;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 
 import java.util.List;
 import javax.annotation.Nullable;
@@ -36,8 +37,8 @@ public interface YarnAppClient extends Service {
    * Creates a {@link ProcessLauncher} for launching the application 
represented by the given spec. If scheduler queue
    * is available and is supported by the YARN cluster, it will be launched in 
the given queue.
    */
-  ProcessLauncher<ApplicationId> createLauncher(TwillSpecification twillSpec,
-                                                @Nullable String 
schedulerQueue) throws Exception;
+  ProcessLauncher<ApplicationMasterInfo> createLauncher(TwillSpecification 
twillSpec,
+                                                        @Nullable String 
schedulerQueue) throws Exception;
 
   /**
    * Creates a {@link ProcessLauncher} for launching application with the 
given user and spec. If scheduler queue
@@ -46,9 +47,9 @@ public interface YarnAppClient extends Service {
    * @deprecated This method will get removed.
    */
   @Deprecated
-  ProcessLauncher<ApplicationId> createLauncher(String user,
-                                                TwillSpecification twillSpec,
-                                                @Nullable String 
schedulerQueue) throws Exception;
+  ProcessLauncher<ApplicationMasterInfo> createLauncher(String user,
+                                                        TwillSpecification 
twillSpec,
+                                                        @Nullable String 
schedulerQueue) throws Exception;
 
   /**
    * Creates a {@link ProcessController} that can controls an application 
represented by the given application id.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java 
b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 4e9f76d..a444dda 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -40,7 +40,6 @@ import com.google.gson.GsonBuilder;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.ClassAcceptor;
 import org.apache.twill.api.EventHandlerSpecification;
@@ -68,6 +67,7 @@ import org.apache.twill.internal.LogOnlyEventHandler;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterMain;
 import org.apache.twill.internal.container.TwillContainerMain;
 import org.apache.twill.internal.json.ArgumentsCodec;
@@ -76,6 +76,7 @@ import org.apache.twill.internal.json.LocalFileCodec;
 import org.apache.twill.internal.json.TwillSpecificationAdapter;
 import org.apache.twill.internal.utils.Dependencies;
 import org.apache.twill.internal.utils.Paths;
+import org.apache.twill.internal.utils.Resources;
 import org.apache.twill.internal.yarn.YarnAppClient;
 import org.apache.twill.internal.yarn.YarnApplicationReport;
 import org.apache.twill.internal.yarn.YarnUtils;
@@ -283,8 +284,8 @@ final class YarnTwillPreparer implements TwillPreparer {
   @Override
   public TwillController start() {
     try {
-      final ProcessLauncher<ApplicationId> launcher = 
yarnAppClient.createLauncher(twillSpec, schedulerQueue);
-      final ApplicationId appId = launcher.getContainerInfo();
+      final ProcessLauncher<ApplicationMasterInfo> launcher = 
yarnAppClient.createLauncher(twillSpec, schedulerQueue);
+      final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
       Callable<ProcessController<YarnApplicationReport>> submitTask =
         new Callable<ProcessController<YarnApplicationReport>>() {
         @Override
@@ -310,7 +311,7 @@ final class YarnTwillPreparer implements TwillPreparer {
                                                      
Constants.Files.LAUNCHER_JAR,
                                                      
Constants.Files.ARGUMENTS));
 
-          LOG.debug("Submit AM container spec: {}", appId);
+          LOG.debug("Submit AM container spec: {}", appMasterInfo);
           // java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR 
-XmxMemory
           //     org.apache.twill.internal.TwillLauncher
           //     appMaster.jar
@@ -329,6 +330,9 @@ final class YarnTwillPreparer implements TwillPreparer {
             LOG.debug("Log level is set to {} for the Twill application.", 
logLevel);
             builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
           }
+
+          int memory = 
Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
+                                                    
Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
           return launcher.prepareLaunch(builder.build(), localFiles.values(), 
credentials)
             .addCommand(
               "$JAVA_HOME/bin/java",
@@ -336,7 +340,7 @@ final class YarnTwillPreparer implements TwillPreparer {
               "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
               "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
               "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
-              "-Xmx" + (Constants.APP_MASTER_MEMORY_MB - 
Constants.APP_MASTER_RESERVED_MEMORY_MB) + "m",
+              "-Xmx" + memory + "m",
               extraOptions == null ? "" : extraOptions,
               TwillLauncher.class.getName(),
               Constants.Files.APP_MASTER_JAR,

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
index 6e27b69..c6f7b9a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -29,6 +29,8 @@ import org.apache.twill.api.logging.PrinterLogHandler;
 import org.apache.twill.discovery.ServiceDiscovered;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.PrintWriter;
 import java.util.concurrent.ExecutionException;
@@ -36,11 +38,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /**
- * Test for requesting different container size in different order.
- * It specifically test for workaround for YARN-314.
+ * Tests related to different container sizes.
  */
 public class ContainerSizeTestRun extends BaseYarnTest {
 
+  /**
+   * Test for requesting different container size in different order.
+   * It specifically test for workaround for YARN-314.
+   */
   @Test
   public void testContainerSize() throws InterruptedException, 
TimeoutException, ExecutionException {
     TwillRunner runner = getTwillRunner();
@@ -56,6 +61,20 @@ public class ContainerSizeTestRun extends BaseYarnTest {
     }
   }
 
+  @Test
+  public void testMaxHeapSize() throws InterruptedException, TimeoutException, 
ExecutionException {
+    TwillRunner runner = getTwillRunner();
+    TwillController controller = runner.prepare(new MaxHeapApp())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .start();
+
+    try {
+      ServiceDiscovered discovered = controller.discoverService("sleep");
+      Assert.assertTrue(waitForSize(discovered, 1, 120));
+    } finally {
+      controller.terminate().get(120, TimeUnit.SECONDS);
+    }
+  }
 
   /**
    * An application that has two runnables with different memory size.
@@ -86,7 +105,6 @@ public class ContainerSizeTestRun extends BaseYarnTest {
     }
   }
 
-
   /**
    * A runnable that sleep for 120 seconds.
    */
@@ -116,4 +134,63 @@ public class ContainerSizeTestRun extends BaseYarnTest {
       }
     }
   }
+
+  /**
+   * An application for testing max heap size.
+   */
+  public static final class MaxHeapApp implements TwillApplication {
+
+    @Override
+    public TwillSpecification configure() {
+      // Make the runnable request for container smaller than 128MB (the 
allocation minimum)
+      ResourceSpecification res = ResourceSpecification.Builder.with()
+        .setVirtualCores(1)
+        .setMemory(16, ResourceSpecification.SizeUnit.MEGA)
+        .build();
+
+      return TwillSpecification.Builder.with()
+        .setName("MaxHeapApp")
+        .withRunnable()
+        .add("sleep", new MaxHeapRunnable(12345), res).noLocalFiles()
+        .anyOrder()
+        .build();
+    }
+  }
+
+  /**
+   * The runnable for testing max heap size.
+   */
+  public static final class MaxHeapRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaxHeapRunnable.class);
+    private volatile Thread runThread;
+
+    public MaxHeapRunnable(int port) {
+      super(ImmutableMap.of("port", Integer.toString(port)));
+    }
+
+    @Override
+    public void run() {
+      // This heap size should be > 16, since the min allocation size is 128mb
+      if (Runtime.getRuntime().maxMemory() <= 16 * 1024 * 1024) {
+        LOG.error("Memory size is too small: {}", 
Runtime.getRuntime().maxMemory());
+        return;
+      }
+
+      runThread = Thread.currentThread();
+      getContext().announce("sleep", 
Integer.parseInt(getContext().getSpecification().getConfigs().get("port")));
+      try {
+        TimeUnit.SECONDS.sleep(120);
+      } catch (InterruptedException e) {
+        // Ignore.
+      }
+    }
+
+    @Override
+    public void stop() {
+      if (runThread != null) {
+        runThread.interrupt();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/66402b4f/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
----------------------------------------------------------------------
diff --git 
a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java 
b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
index 162d4db..a9120c3 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java
@@ -30,6 +30,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -188,12 +189,21 @@ public class ZKClientTest {
       client.startAndWait();
 
       try {
-        final BlockingQueue<Watcher.Event.EventType> events = new 
LinkedBlockingQueue<Watcher.Event.EventType>();
+        final BlockingQueue<Watcher.Event.EventType> events = new 
LinkedBlockingQueue<>();
         client.exists("/expireRewatch", new Watcher() {
           @Override
-          public void process(WatchedEvent event) {
-            client.exists("/expireRewatch", this);
-            events.add(event.getType());
+          public void process(final WatchedEvent event) {
+            Futures.addCallback(client.exists("/expireRewatch", this), new 
FutureCallback<Stat>() {
+              @Override
+              public void onSuccess(Stat result) {
+                events.add(event.getType());
+              }
+
+              @Override
+              public void onFailure(Throwable t) {
+                LOG.error("Failed to call exists on /expireRewatch", t);
+              }
+            });
           }
         });
 
@@ -309,7 +319,7 @@ public class ZKClientTest {
       Assert.assertEquals("digest", acl.getId().getScheme());
       Assert.assertEquals(digest, acl.getId().getId());
 
-      Assert.assertEquals("test", new 
String(noAuthClient.getData(path).get().getData()));
+      Assert.assertArrayEquals("test".getBytes(), 
noAuthClient.getData(path).get().getData());
 
       // When tries to write using the no-auth zk client, it should fail.
       try {

Reply via email to