Github user chtyim commented on a diff in the pull request:

    https://github.com/apache/incubator-twill/pull/52#discussion_r34298029
  
    --- Diff: 
twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
 ---
    @@ -220,6 +232,127 @@ public void process(WatchedEvent event) {
       }
     
       /**
    +   * Send update message to all instances of a runnnable.
    +   *
    +   * @param command The command to be sent in the message to the Twill 
cluster.
    +   * @param runnableName The name of the Twill runnable to restart.
    +   * @return A {@link Future} that could executed by caller to get the 
result.
    +   */
    +  protected <V> ListenableFuture<List<V>> 
updateAllRunnableInstances(String command, String runnableName) {
    +    // Create ZK node for result
    +    String zkResultPath = getUpdateRunnablesResultPath(command);
    +    Command updateStateCommand = Command.Builder.of(command).
    +      addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH, 
zkResultPath).
    +      build();
    +    Message message = 
SystemMessages.updateRunnableInstances(updateStateCommand, runnableName);
    +
    +    return sendUpdateRunnableInstancesMessage(message, zkResultPath, new 
TypeToken<List<V>>() {});
    +  }
    +
    +  /**
    +   * Send update message to some instances of runnnables.
    +   *
    +   * @param command  The command to be sent in the message to the Twill 
cluster.
    +   * @param runnableToInstanceIds  Map of runnable name to list of 
instances to be updated.
    +   * @return A {@link Future} that could executed by caller to get the 
result.
    +   */
    +  protected <V> ListenableFuture<Map<String, List<V>>> 
updateRunnablesInstances(String command, Map<String,
    +      List<Integer>> runnableToInstanceIds) {
    +    Map<String, String> runnableToStringInstanceIds = 
Maps.transformEntries(
    +      runnableToInstanceIds, new Maps.EntryTransformer<String, 
List<Integer>, String>() {
    +        @Override
    +        public String transformEntry(String key, List<Integer> value) {
    +          return GSON.toJson(value, new TypeToken<List<String>>() {
    +          }.getType());
    +        }
    +      });
    +
    +    // Create ZK node for result
    +    String zkResultPath = getUpdateRunnablesResultPath(command);
    +    Command updateStateCommand = 
Command.Builder.of(command).addOptions(runnableToStringInstanceIds).
    +      addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH, 
zkResultPath).
    +      build();
    +    Message message = 
SystemMessages.updateRunnablesInstances(updateStateCommand);
    +
    +    return sendUpdateRunnableInstancesMessage(message, zkResultPath, new 
TypeToken<Map<String, List<V>>>() {});
    +  }
    +
    +  // Helper method to send message to update runnable instances.
    +  @SuppressWarnings("unchecked")
    +  private <V> ListenableFuture<V> sendUpdateRunnableInstancesMessage(final 
Message message, final String zkResultPath,
    +                                                                     final 
TypeToken<V> type) {
    +
    +    // Create ZK node for result
    +    zkClient.create(zkResultPath, null, CreateMode.PERSISTENT);
    +
    +    LOG.info("Create ZK path for runnable instances update {} at {}", 
message, zkResultPath);
    +
    +    final SettableFuture<V> settableFuture = SettableFuture.create();
    +
    +    ListenableFuture<Command> sendMessageFuture = sendMessage(message, 
message.getCommand());
    +    Futures.addCallback(sendMessageFuture, new FutureCallback<Command>() {
    +      @Override
    +      public void onSuccess(final Command command) {
    +        LOG.debug("Success sending update runnable instance message with 
command: {}", command);
    +
    +        ZKOperations.watchData(zkClient, zkResultPath, new 
ZKOperations.DataCallback() {
    --- End diff --
    
    I don't quit follow the logic here. Can you describe on high level what get 
written to which ZK node(s) by whom at when?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to