Github user chtyim commented on a diff in the pull request:
https://github.com/apache/incubator-twill/pull/52#discussion_r34298029
--- Diff:
twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
---
@@ -220,6 +232,127 @@ public void process(WatchedEvent event) {
}
/**
+ * Send update message to all instances of a runnnable.
+ *
+ * @param command The command to be sent in the message to the Twill
cluster.
+ * @param runnableName The name of the Twill runnable to restart.
+ * @return A {@link Future} that could executed by caller to get the
result.
+ */
+ protected <V> ListenableFuture<List<V>>
updateAllRunnableInstances(String command, String runnableName) {
+ // Create ZK node for result
+ String zkResultPath = getUpdateRunnablesResultPath(command);
+ Command updateStateCommand = Command.Builder.of(command).
+ addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH,
zkResultPath).
+ build();
+ Message message =
SystemMessages.updateRunnableInstances(updateStateCommand, runnableName);
+
+ return sendUpdateRunnableInstancesMessage(message, zkResultPath, new
TypeToken<List<V>>() {});
+ }
+
+ /**
+ * Send update message to some instances of runnnables.
+ *
+ * @param command The command to be sent in the message to the Twill
cluster.
+ * @param runnableToInstanceIds Map of runnable name to list of
instances to be updated.
+ * @return A {@link Future} that could executed by caller to get the
result.
+ */
+ protected <V> ListenableFuture<Map<String, List<V>>>
updateRunnablesInstances(String command, Map<String,
+ List<Integer>> runnableToInstanceIds) {
+ Map<String, String> runnableToStringInstanceIds =
Maps.transformEntries(
+ runnableToInstanceIds, new Maps.EntryTransformer<String,
List<Integer>, String>() {
+ @Override
+ public String transformEntry(String key, List<Integer> value) {
+ return GSON.toJson(value, new TypeToken<List<String>>() {
+ }.getType());
+ }
+ });
+
+ // Create ZK node for result
+ String zkResultPath = getUpdateRunnablesResultPath(command);
+ Command updateStateCommand =
Command.Builder.of(command).addOptions(runnableToStringInstanceIds).
+ addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH,
zkResultPath).
+ build();
+ Message message =
SystemMessages.updateRunnablesInstances(updateStateCommand);
+
+ return sendUpdateRunnableInstancesMessage(message, zkResultPath, new
TypeToken<Map<String, List<V>>>() {});
+ }
+
+ // Helper method to send message to update runnable instances.
+ @SuppressWarnings("unchecked")
+ private <V> ListenableFuture<V> sendUpdateRunnableInstancesMessage(final
Message message, final String zkResultPath,
+ final
TypeToken<V> type) {
+
+ // Create ZK node for result
+ zkClient.create(zkResultPath, null, CreateMode.PERSISTENT);
+
+ LOG.info("Create ZK path for runnable instances update {} at {}",
message, zkResultPath);
+
+ final SettableFuture<V> settableFuture = SettableFuture.create();
+
+ ListenableFuture<Command> sendMessageFuture = sendMessage(message,
message.getCommand());
+ Futures.addCallback(sendMessageFuture, new FutureCallback<Command>() {
+ @Override
+ public void onSuccess(final Command command) {
+ LOG.debug("Success sending update runnable instance message with
command: {}", command);
+
+ ZKOperations.watchData(zkClient, zkResultPath, new
ZKOperations.DataCallback() {
--- End diff --
I don't quit follow the logic here. Can you describe on high level what get
written to which ZK node(s) by whom at when?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---