[ 
https://issues.apache.org/jira/browse/TWILL-116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14621224#comment-14621224
 ] 

ASF GitHub Bot commented on TWILL-116:
--------------------------------------

Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/incubator-twill/pull/52#discussion_r34301114
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
 ---
    @@ -220,6 +232,127 @@ public void process(WatchedEvent event) {
       }
     
       /**
    +   * Send update message to all instances of a runnnable.
    +   *
    +   * @param command The command to be sent in the message to the Twill 
cluster.
    +   * @param runnableName The name of the Twill runnable to restart.
    +   * @return A {@link Future} that could executed by caller to get the 
result.
    +   */
    +  protected <V> ListenableFuture<List<V>> 
updateAllRunnableInstances(String command, String runnableName) {
    +    // Create ZK node for result
    +    String zkResultPath = getUpdateRunnablesResultPath(command);
    +    Command updateStateCommand = Command.Builder.of(command).
    +      addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH, 
zkResultPath).
    +      build();
    +    Message message = 
SystemMessages.updateRunnableInstances(updateStateCommand, runnableName);
    +
    +    return sendUpdateRunnableInstancesMessage(message, zkResultPath, new 
TypeToken<List<V>>() {});
    +  }
    +
    +  /**
    +   * Send update message to some instances of runnnables.
    +   *
    +   * @param command  The command to be sent in the message to the Twill 
cluster.
    +   * @param runnableToInstanceIds  Map of runnable name to list of 
instances to be updated.
    +   * @return A {@link Future} that could executed by caller to get the 
result.
    +   */
    +  protected <V> ListenableFuture<Map<String, List<V>>> 
updateRunnablesInstances(String command, Map<String,
    +      List<Integer>> runnableToInstanceIds) {
    +    Map<String, String> runnableToStringInstanceIds = 
Maps.transformEntries(
    +      runnableToInstanceIds, new Maps.EntryTransformer<String, 
List<Integer>, String>() {
    +        @Override
    +        public String transformEntry(String key, List<Integer> value) {
    +          return GSON.toJson(value, new TypeToken<List<String>>() {
    +          }.getType());
    +        }
    +      });
    +
    +    // Create ZK node for result
    +    String zkResultPath = getUpdateRunnablesResultPath(command);
    +    Command updateStateCommand = 
Command.Builder.of(command).addOptions(runnableToStringInstanceIds).
    +      addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH, 
zkResultPath).
    +      build();
    +    Message message = 
SystemMessages.updateRunnablesInstances(updateStateCommand);
    +
    +    return sendUpdateRunnableInstancesMessage(message, zkResultPath, new 
TypeToken<Map<String, List<V>>>() {});
    +  }
    +
    +  // Helper method to send message to update runnable instances.
    +  @SuppressWarnings("unchecked")
    +  private <V> ListenableFuture<V> sendUpdateRunnableInstancesMessage(final 
Message message, final String zkResultPath,
    +                                                                     final 
TypeToken<V> type) {
    +
    +    // Create ZK node for result
    +    zkClient.create(zkResultPath, null, CreateMode.PERSISTENT);
    +
    +    LOG.info("Create ZK path for runnable instances update {} at {}", 
message, zkResultPath);
    +
    +    final SettableFuture<V> settableFuture = SettableFuture.create();
    +
    +    ListenableFuture<Command> sendMessageFuture = sendMessage(message, 
message.getCommand());
    +    Futures.addCallback(sendMessageFuture, new FutureCallback<Command>() {
    +      @Override
    +      public void onSuccess(final Command command) {
    +        LOG.debug("Success sending update runnable instance message with 
command: {}", command);
    +
    +        ZKOperations.watchData(zkClient, zkResultPath, new 
ZKOperations.DataCallback() {
    --- End diff --
    
    So the flow goes like:
    app driver > restart instance > get SettableFuture
    app driver > TwillController > create ZK node to store result > send 
message > once send message successful add watcher to listen to updated result 
in the ZK node created before > the callback will read data and set it to 
SettableFuture returned to app driver before.


> Support for restart instances of runnable in an application
> -----------------------------------------------------------
>
>                 Key: TWILL-116
>                 URL: https://issues.apache.org/jira/browse/TWILL-116
>             Project: Apache Twill
>          Issue Type: New Feature
>          Components: core
>            Reporter: Albert Shau
>            Assignee: Henry Saputra
>             Fix For: 0.6.0-incubating
>
>         Attachments: TWILL-116-design-4.pdf, TWILL-116-design-5.pdf, 
> TWILL-116-design-6.pdf, TWILL-116-design-7.pdf, TWILL-116-design-final-2.pdf
>
>
> Once an application is running, it would be good to be able to stop, start, 
> and restart a specific runnable of the application without affecting other 
> runnables.  
> For example, I may be running multiple services in a single application, with 
> each service as a different runnable. One of my services gets into an invalid 
> state. I now want to restart just that runnable and not the other ones that 
> are running properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to