TWILL-116 Add TwillRunnable instances lifecycle management.

Add support to restart certain or all instances of runnables in a Twill 
application.
See proposed design attached to the JIRA: 
https://issues.apache.org/jira/browse/TWILL-116

Summary of changes:
1. Add new APIs to TwillController to restart all or certain instances of 
runnables.
2. Support message handler in ApplicationMasterService for restart instances.
3. Modify RunningContainers to launch new container using same instance id to 
support restart for a runnable.
4. Add validation for instance ids to be restarted.

This closes #52 on GitHub.

Signed-off-by: Terence Yim <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/d4a1508e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/d4a1508e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/d4a1508e

Branch: refs/heads/site
Commit: d4a1508eeacbebdff5263b1ecad182775f27162a
Parents: d181b7c
Author: hsaputra <[email protected]>
Authored: Fri Jun 26 22:19:24 2015 -0700
Committer: Terence Yim <[email protected]>
Committed: Mon Jul 13 18:33:30 2015 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/TwillController.java   | 28 +++++++
 .../twill/internal/AbstractTwillController.java | 81 +++++++++++++++++++-
 .../internal/AbstractZKServiceController.java   |  1 +
 .../org/apache/twill/internal/Constants.java    |  5 ++
 .../org/apache/twill/internal/ZKMessages.java   |  2 +-
 .../apache/twill/internal/state/Message.java    |  3 +-
 .../twill/internal/state/SystemMessages.java    | 27 ++++++-
 .../yarn/Hadoop21YarnContainerInfo.java         |  2 +-
 .../org/apache/twill/internal/ServiceMain.java  |  2 +-
 .../appmaster/ApplicationMasterService.java     | 79 +++++++++++++++++++
 .../appmaster/RunnableContainerRequest.java     |  1 +
 .../internal/appmaster/RunningContainers.java   | 65 ++++++++++------
 .../apache/twill/yarn/EchoServerTestRun.java    | 70 +++++++++++++++++
 13 files changed, 338 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-api/src/main/java/org/apache/twill/api/TwillController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillController.java 
b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
index b84f817..08206f5 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillController.java
@@ -21,6 +21,8 @@ import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.discovery.Discoverable;
 import org.apache.twill.discovery.ServiceDiscovered;
 
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 
@@ -61,4 +63,30 @@ public interface TwillController extends ServiceController {
    */
   @Nullable
   ResourceReport getResourceReport();
+
+  /**
+   * Restart all instances of a particular {@link TwillRunnable}.
+   *
+   * @param runnable The name of the runnable to restart.
+   * @return A {@link Future} that will be completed when the restart 
operation has been done.
+   */
+  Future<String> restartAllInstances(String runnable);
+
+  /**
+   * Restart instances of some {@link TwillRunnable}.
+   *
+   * @param runnableToInstanceIds A map of runnable ID to list of instance IDs 
to be restarted.
+   * @return A {@link Future} that will be completed when the restart 
operation has been done.
+   */
+  Future<Set<String>> restartInstances(Map<String, ? extends Set<Integer>> 
runnableToInstanceIds);
+
+  /**
+   * Restart instances of some {@link TwillRunnable}.
+   *
+   * @param runnable The name of the runnable to restart.
+   * @param instanceId The main instance id to be restarted.
+   * @param moreInstanceIds The optional instance ids.
+   * @return A {@link Future} that will be completed when the restart 
operation has been done.
+   */
+  Future<String> restartInstances(String runnable, int instanceId, int... 
moreInstanceIds);
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
 
b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index bf20616..41a044b 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -18,12 +18,22 @@
 package org.apache.twill.internal;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.api.logging.LogHandler;
 import org.apache.twill.api.logging.LogThrowable;
@@ -35,6 +45,7 @@ import org.apache.twill.internal.json.LogEntryDecoder;
 import org.apache.twill.internal.json.LogThrowableCodec;
 import org.apache.twill.internal.json.StackTraceElementCodec;
 import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
+import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.state.SystemMessages;
 import org.apache.twill.kafka.client.FetchedMessage;
 import org.apache.twill.kafka.client.KafkaClientService;
@@ -44,8 +55,11 @@ import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -55,6 +69,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public abstract class AbstractTwillController extends 
AbstractZKServiceController implements TwillController {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTwillController.class);
+  private static final Gson GSON = new Gson();
 
   private final Queue<LogHandler> logHandlers;
   private final KafkaClientService kafkaClient;
@@ -63,7 +78,7 @@ public abstract class AbstractTwillController extends 
AbstractZKServiceControlle
 
   public AbstractTwillController(RunId runId, ZKClient zkClient, 
Iterable<LogHandler> logHandlers) {
     super(runId, zkClient);
-    this.logHandlers = new ConcurrentLinkedQueue<LogHandler>();
+    this.logHandlers = new ConcurrentLinkedQueue<>();
     this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zkClient, 
"/" + runId.getId() + "/kafka"));
     this.discoveryServiceClient = new ZKDiscoveryService(zkClient);
     Iterables.addAll(this.logHandlers, logHandlers);
@@ -109,6 +124,70 @@ public abstract class AbstractTwillController extends 
AbstractZKServiceControlle
     return sendMessage(SystemMessages.setInstances(runnable, newCount), 
newCount);
   }
 
+  @Override
+  public final ListenableFuture<String> restartAllInstances(String 
runnableName) {
+    Command updateStateCommand = 
Command.Builder.of(Constants.RESTART_ALL_RUNNABLE_INSTANCES).
+      build();
+    Message message = 
SystemMessages.updateRunnableInstances(updateStateCommand, runnableName);
+    return sendMessage(message, updateStateCommand.getCommand());
+  }
+
+  @Override
+  public final ListenableFuture<Set<String>> restartInstances(Map<String,
+      ? extends Set<Integer>> runnableToInstanceIds) {
+    Map<String, String> runnableToStringInstanceIds =
+      Maps.transformEntries(runnableToInstanceIds, new 
Maps.EntryTransformer<String, Set<Integer>, String>() {
+        @Override
+        public String transformEntry(String runnableName, Set<Integer> 
instanceIds) {
+          validateInstanceIds(runnableName, instanceIds);
+          return GSON.toJson(instanceIds, new TypeToken<Set<Integer>>() 
{}.getType());
+        }
+      });
+    Command updateStateCommand = 
Command.Builder.of(Constants.RESTART_RUNNABLES_INSTANCES)
+      .addOptions(runnableToStringInstanceIds)
+      .build();
+    Message message = 
SystemMessages.updateRunnablesInstances(updateStateCommand);
+
+    return sendMessage(message, runnableToInstanceIds.keySet());
+  }
+
+  @Override
+  public ListenableFuture<String> restartInstances(final String runnable, int 
instanceId, int... moreInstanceIds) {
+    Set<Integer> instanceIds = Sets.newLinkedHashSet();
+    instanceIds.add(instanceId);
+    for (int id : moreInstanceIds) {
+      instanceIds.add(id);
+    }
+
+    return Futures.transform(restartInstances(ImmutableMap.of(runnable, 
instanceIds)),
+                             new Function<Set<String>, String>() {
+      public String apply(Set<String> input) {
+        return runnable;
+      }
+    });
+  }
+
+  private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
+    ResourceReport resourceReport = getResourceReport();
+    if (resourceReport == null) {
+      throw new IllegalStateException("Unable to get resource report since 
application has not started.");
+    }
+    Collection<TwillRunResources> runnableResources = 
resourceReport.getRunnableResources(runnable);
+    if (runnableResources == null) {
+      throw new RuntimeException("Unable to verify run resources for runnable 
" + runnable);
+    }
+    Set<Integer> existingInstanceIds = Sets.newHashSet();
+    for (TwillRunResources twillRunResources : runnableResources) {
+      existingInstanceIds.add(twillRunResources.getInstanceId());
+    }
+    LOG.info("Existing instance ids: {}", existingInstanceIds);
+    for (int instanceId : instanceIds) {
+      if (!existingInstanceIds.contains(instanceId)) {
+        throw new IllegalArgumentException("Unable to find instance id " + 
instanceId + " for " + runnable);
+      }
+    }
+  }
+
   private static final class LogMessageCallback implements 
KafkaConsumer.MessageCallback {
 
     private static final Gson GSON = new GsonBuilder()

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
 
b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
index 6d95009..0cf92ea 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.twill.api.Command;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.ServiceController;

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java 
b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 64b029d..e3a2194 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -47,6 +47,11 @@ public final class Constants {
 
   public static final String CLASSPATH = "classpath";
   public static final String APPLICATION_CLASSPATH = "application-classpath";
+
+  /** For runnables instance lifecycle ZK path */
+  public static final String RESTART_ALL_RUNNABLE_INSTANCES = 
"restartAllRunnableInstances";
+  public static final String RESTART_RUNNABLES_INSTANCES = 
"restartRunnablesInstances";
+
   /**
    * Constants for names of internal files that are shared between client, AM 
and containers.
    */

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java 
b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
index b7905d9..cfcc7e7 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java
@@ -28,7 +28,7 @@ import org.apache.twill.zookeeper.ZKOperations;
 import org.apache.zookeeper.CreateMode;
 
 /**
- *
+ * Helper class to send messages to remote instances using Apache Zookeeper 
watch mechanism.
  */
 public final class ZKMessages {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/state/Message.java 
b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
index c6944fd..1c758c1 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/state/Message.java
@@ -38,7 +38,8 @@ public interface Message {
   enum Scope {
     APPLICATION,
     ALL_RUNNABLE,
-    RUNNABLE
+    RUNNABLE,
+    RUNNABLES
   }
 
   Type getType();

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java 
b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
index 73683bd..4c4bb8b 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/state/SystemMessages.java
@@ -17,9 +17,10 @@
  */
 package org.apache.twill.internal.state;
 
-import com.google.common.base.Preconditions;
 import org.apache.twill.api.Command;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Collection of predefined system messages.
  */
@@ -43,6 +44,30 @@ public final class SystemMessages {
                              
Command.Builder.of("instances").addOption("count", 
Integer.toString(instances)).build());
   }
 
+  /**
+   * Helper method to get System {@link Message} for update instances for a 
runnable.
+   *
+   * @param updateCommand The {@link Command} to be added to the message.
+   * @param runnableName The name of the runnable to be restarted.
+   * @return An instance of System {@link Message} to restart runnable 
instances.
+   */
+  public static Message updateRunnableInstances(Command updateCommand, String 
runnableName) {
+    Preconditions.checkNotNull(updateCommand);
+    Preconditions.checkNotNull(runnableName);
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLE, 
runnableName, updateCommand);
+  }
+
+  /**
+   * Helper method to get System {@link Message} for restarting certain 
instances from certain runnables.
+   *
+   * @param updateCommand The {@link Command} to be added to the message.
+   * @return An instance of System {@link Message} to restart runnables' 
instances.
+   */
+  public static Message updateRunnablesInstances(Command updateCommand) {
+    Preconditions.checkNotNull(updateCommand);
+    return new SimpleMessage(Message.Type.SYSTEM, Message.Scope.RUNNABLES, 
null, updateCommand);
+  }
+
   private SystemMessages() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
 
b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
index 86903c1..8571933 100644
--- 
a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
+++ 
b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnContainerInfo.java
@@ -24,7 +24,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 /**
- *
+ * Implementation of {@link YarnContainerInfo} for Apache Hadoop 2.1.0 or 
above.
  */
 public final class Hadoop21YarnContainerInfo implements YarnContainerInfo {
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java 
b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index d7ef72b..f7bea24 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -228,7 +228,7 @@ public abstract class ServiceMain {
   }
 
   /**
-   * Override to return the right log level for the service
+   * Override to return the right log level for the service.
    *
    * @param logger the {@link Logger} instance of the service context.
    * @return String of log level based on {@code slf4j} log levels.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 5c89a79..f76cd0b 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -20,14 +20,17 @@ package org.apache.twill.internal.appmaster;
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
+import com.google.common.collect.DiscreteDomains;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
+import com.google.common.collect.Ranges;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.common.io.InputSupplier;
@@ -35,6 +38,7 @@ import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -92,6 +96,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 
 /**
  * The class that acts as {@code ApplicationMaster} for Twill applications.
@@ -99,6 +104,7 @@ import java.util.concurrent.TimeUnit;
 public final class ApplicationMasterService extends AbstractYarnTwillService 
implements Supplier<ResourceReport> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationMasterService.class);
+  private static final Gson GSON = new Gson();
 
   // Copied from org.apache.hadoop.yarn.security.AMRMTokenIdentifier.KIND_NAME 
since it's missing in Hadoop-2.0
   private static final Text AMRM_TOKEN_KIND_NAME = new 
Text("YARN_AM_RM_TOKEN");
@@ -288,6 +294,10 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       return result;
     }
 
+    if (handleRestartRunnablesInstances(message, completion)) {
+      return result;
+    }
+
     // Replicate messages to all runnables
     if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
       runningContainers.sendToAll(message, completion);
@@ -787,4 +797,73 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     capability.setMemory(resourceSpec.getMemorySize());
     return capability;
   }
+
+  /**
+   * Attempt to restart some instances from a runnable or some runnables.
+   * @return {@code true} if the message requests restarting some instances 
and {@code false} otherwise.
+   */
+  private boolean handleRestartRunnablesInstances(final Message message, final 
Runnable completion) {
+    LOG.debug("Check if it should process a restart runnable instances.");
+
+    if (message.getType() != Message.Type.SYSTEM) {
+      return false;
+    }
+
+    Message.Scope messageScope = message.getScope();
+    if (messageScope != Message.Scope.RUNNABLE && messageScope != 
Message.Scope.RUNNABLES) {
+      return false;
+    }
+
+    Command requestCommand = message.getCommand();
+    if 
(!Constants.RESTART_ALL_RUNNABLE_INSTANCES.equals(requestCommand.getCommand()) 
&&
+      
!Constants.RESTART_RUNNABLES_INSTANCES.equals(requestCommand.getCommand())) {
+      return false;
+    }
+
+    LOG.debug("Processing restart runnable instances message {}.", message);
+
+    if (!Strings.isNullOrEmpty(message.getRunnableName()) && 
message.getScope() == Message.Scope.RUNNABLE) {
+      // ... for a runnable ...
+      String runnableName = message.getRunnableName();
+      LOG.debug("Start restarting all runnable {} instances.", runnableName);
+      restartRunnableInstances(runnableName, null);
+    } else {
+      // ... or maybe some runnables
+      for (Map.Entry<String, String> option : 
requestCommand.getOptions().entrySet()) {
+        String runnableName = option.getKey();
+        Set<Integer> restartedInstanceIds = GSON.fromJson(option.getValue(),
+                                                           new 
TypeToken<Set<Integer>>() {}.getType());
+
+        LOG.debug("Start restarting runnable {} instances {}", runnableName, 
restartedInstanceIds);
+        restartRunnableInstances(runnableName, restartedInstanceIds);
+      }
+    }
+
+    completion.run();
+    return true;
+  }
+
+  /**
+   * Helper method to restart instances of runnables.
+   */
+  private void restartRunnableInstances(String runnableName, @Nullable 
Set<Integer> instanceIds) {
+    LOG.debug("Begin restart runnable {} instances.", runnableName);
+
+    Set<Integer> instancesToRemove = instanceIds;
+    if (instancesToRemove == null) {
+      instancesToRemove = Ranges.closedOpen(0, 
runningContainers.count(runnableName)).asSet(DiscreteDomains.integers());
+    }
+
+    for (int instanceId : instancesToRemove) {
+      LOG.debug("Remove instance {} for runnable {}", instanceId, 
runnableName);
+      try {
+        runningContainers.removeById(runnableName, instanceId);
+      } catch (Exception ex) {
+        // could be thrown if the container already stopped.
+        LOG.info("Exception thrown when stopping instance {} probably already 
stopped.", instanceId);
+      }
+    }
+    LOG.info("Restarting instances {} for runnable {}", instancesToRemove, 
runnableName);
+    runnableContainerRequests.add(createRunnableContainerRequest(runnableName, 
instancesToRemove.size()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index 2105629..f065380 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -26,6 +26,7 @@ import org.apache.twill.api.TwillSpecification;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 4a56229..d957768 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -121,6 +121,9 @@ final class RunningContainers {
     }
   }
 
+  /**
+   * Start a container for a runnable.
+   */
   void start(String runnableName, ContainerInfo containerInfo, 
TwillContainerLauncher launcher) {
     containerLock.lock();
     try {
@@ -186,32 +189,47 @@ final class RunningContainers {
         LOG.warn("No running container found for {}", runnableName);
         return;
       }
+      removeInstanceById(runnableName, maxInstanceId);
+    } finally {
+      containerLock.unlock();
+    }
+  }
 
-      String lastContainerId = null;
-      TwillContainerController lastController = null;
+  /**
+   * Stop and remove a container for a runnable on an id.
+   */
+  void removeById(String runnableName, int instanceId) {
+    containerLock.lock();
+    try {
+      removeInstanceById(runnableName, instanceId);
+    } finally {
+      containerLock.unlock();
+    }
+  }
 
-      // Find the controller with the maxInstanceId
-      for (Map.Entry<String, TwillContainerController> entry : 
containers.row(runnableName).entrySet()) {
-        if (getInstanceId(entry.getValue().getRunId()) == maxInstanceId) {
-          lastContainerId = entry.getKey();
-          lastController = entry.getValue();
-          break;
-        }
+  private void removeInstanceById(String runnableName, int instanceId) {
+    String containerId = null;
+    TwillContainerController controller = null;
+
+    // Find the controller with particular instance id.
+    for (Map.Entry<String, TwillContainerController> entry : 
containers.row(runnableName).entrySet()) {
+      if (getInstanceId(entry.getValue().getRunId()) == instanceId) {
+        containerId = entry.getKey();
+        controller = entry.getValue();
+        break;
       }
+    }
 
-      Preconditions.checkState(lastContainerId != null,
-                               "No container found for {} with instanceId = 
{}", runnableName, maxInstanceId);
+    Preconditions.checkState(containerId != null,
+                             "No container found for {} with instanceId = {}", 
runnableName, instanceId);
 
-      LOG.info("Stopping service: {} {}", runnableName, 
lastController.getRunId());
-      lastController.stopAndWait();
-      containers.remove(runnableName, lastContainerId);
-      removeContainerInfo(lastContainerId);
-      removeInstanceId(runnableName, maxInstanceId);
-      resourceReport.removeRunnableResources(runnableName, lastContainerId);
-      containerChange.signalAll();
-    } finally {
-      containerLock.unlock();
-    }
+    LOG.info("Stopping service: {} {}", runnableName, controller.getRunId());
+    controller.stopAndWait();
+    containers.remove(runnableName, containerId);
+    removeContainerInfo(containerId);
+    removeInstanceId(runnableName, instanceId);
+    resourceReport.removeRunnableResources(runnableName, containerId);
+    containerChange.signalAll();
   }
 
   /**
@@ -446,6 +464,9 @@ final class RunningContainers {
     return instanceId;
   }
 
+  /**
+   * Remove instance id for a given runnable.
+   */
   private void removeInstanceId(String runnableName, int instanceId) {
     BitSet instances = runnableInstances.get(runnableName);
     if (instances == null) {
@@ -469,7 +490,7 @@ final class RunningContainers {
   }
 
   /**
-   * Returns nnumber of running instances for the given runnable.
+   * Returns number of running instances for the given runnable.
    */
   private int getRunningInstances(String runableName) {
     BitSet instances = runnableInstances.get(runableName);

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d4a1508e/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java 
b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index d635af1..0a8414e 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -18,9 +18,14 @@
 package org.apache.twill.yarn;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Maps;
 import com.google.common.io.LineReader;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.ResourceReport;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.TwillRunnerService;
 import org.apache.twill.api.logging.PrinterLogHandler;
@@ -37,10 +42,13 @@ import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.net.Socket;
 import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 
 /**
  * Using echo server to test various behavior of YarnTwillService.
@@ -107,6 +115,22 @@ public final class EchoServerTestRun extends BaseYarnTest {
     controller.changeInstances("EchoServer", 2);
     Assert.assertTrue(waitForSize(echoServices, 2, 120));
 
+    // Test restart on instances for runnable
+    Map<Integer, String> instanceIdToContainerId = Maps.newHashMap();
+    ResourceReport report = waitForAfterRestartResourceReport(controller, 
"EchoServer", 30L, TimeUnit.SECONDS, 2, null);
+    Assert.assertTrue(report != null);
+    Collection<TwillRunResources> runResources = 
report.getRunnableResources("EchoServer");
+    for (TwillRunResources twillRunResources : runResources) {
+      instanceIdToContainerId.put(twillRunResources.getInstanceId(), 
twillRunResources.getContainerId());
+    }
+
+    controller.restartAllInstances("EchoServer");
+    Assert.assertTrue(waitForSize(echoServices, 2, 120));
+
+    report = waitForAfterRestartResourceReport(controller, "EchoServer", 30L, 
TimeUnit.SECONDS, 2,
+                                               instanceIdToContainerId);
+    Assert.assertTrue(report != null);
+
     // Make sure still only one app is running
     Iterable<TwillRunner.LiveInfo> apps = runner.lookupLive();
     Assert.assertTrue(waitForSize(apps, 1, 120));
@@ -132,4 +156,50 @@ public final class EchoServerTestRun extends BaseYarnTest {
     // Sleep a bit before exiting.
     TimeUnit.SECONDS.sleep(2);
   }
+
+  /**
+   *  Need helper method here to wait for getting resource report because 
{@link TwillController#getResourceReport()}
+   *  could return null if the application has not fully started.
+   *
+   *  This method helps validate restart scenario.
+   *
+   *  To avoid long sleep if instanceIdToContainerId is passed, then compare 
the container ids to ones before.
+   *  Otherwise just return the valid resource report.
+   */
+  @Nullable
+  private ResourceReport waitForAfterRestartResourceReport(TwillController 
controller, String runnable, long timeout,
+                                                           TimeUnit 
timeoutUnit, int numOfResources,
+                                                           @Nullable 
Map<Integer, String> instanceIdToContainerId) {
+    Stopwatch stopwatch = new Stopwatch();
+    stopwatch.start();
+    do {
+      ResourceReport report = controller.getResourceReport();
+      if (report == null || report.getRunnableResources(runnable) == null) {
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } else if (report.getRunnableResources(runnable) == null ||
+          report.getRunnableResources(runnable).size() != numOfResources) {
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      } else {
+        if (instanceIdToContainerId == null) {
+          return report;
+        }
+        Collection<TwillRunResources> runResources = 
report.getRunnableResources(runnable);
+        boolean isSameContainer = false;
+        for (TwillRunResources twillRunResources : runResources) {
+          int instanceId = twillRunResources.getInstanceId();
+          if 
(twillRunResources.getContainerId().equals(instanceIdToContainerId.get(instanceId)))
 {
+            // found same container id lets wait again.
+            isSameContainer = true;
+            break;
+          }
+        }
+        if (!isSameContainer) {
+          LOG.error("Unable to get different container ids for restart.");
+          return report;
+        }
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      }
+    } while (stopwatch.elapsedTime(timeoutUnit) < timeout);
+    return null;
+  }
 }

Reply via email to