[
https://issues.apache.org/jira/browse/TWILL-116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14621179#comment-14621179
]
ASF GitHub Bot commented on TWILL-116:
--------------------------------------
Github user chtyim commented on a diff in the pull request:
https://github.com/apache/incubator-twill/pull/52#discussion_r34298029
--- Diff:
twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java
---
@@ -220,6 +232,127 @@ public void process(WatchedEvent event) {
}
/**
+ * Send update message to all instances of a runnnable.
+ *
+ * @param command The command to be sent in the message to the Twill
cluster.
+ * @param runnableName The name of the Twill runnable to restart.
+ * @return A {@link Future} that could executed by caller to get the
result.
+ */
+ protected <V> ListenableFuture<List<V>>
updateAllRunnableInstances(String command, String runnableName) {
+ // Create ZK node for result
+ String zkResultPath = getUpdateRunnablesResultPath(command);
+ Command updateStateCommand = Command.Builder.of(command).
+ addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH,
zkResultPath).
+ build();
+ Message message =
SystemMessages.updateRunnableInstances(updateStateCommand, runnableName);
+
+ return sendUpdateRunnableInstancesMessage(message, zkResultPath, new
TypeToken<List<V>>() {});
+ }
+
+ /**
+ * Send update message to some instances of runnnables.
+ *
+ * @param command The command to be sent in the message to the Twill
cluster.
+ * @param runnableToInstanceIds Map of runnable name to list of
instances to be updated.
+ * @return A {@link Future} that could executed by caller to get the
result.
+ */
+ protected <V> ListenableFuture<Map<String, List<V>>>
updateRunnablesInstances(String command, Map<String,
+ List<Integer>> runnableToInstanceIds) {
+ Map<String, String> runnableToStringInstanceIds =
Maps.transformEntries(
+ runnableToInstanceIds, new Maps.EntryTransformer<String,
List<Integer>, String>() {
+ @Override
+ public String transformEntry(String key, List<Integer> value) {
+ return GSON.toJson(value, new TypeToken<List<String>>() {
+ }.getType());
+ }
+ });
+
+ // Create ZK node for result
+ String zkResultPath = getUpdateRunnablesResultPath(command);
+ Command updateStateCommand =
Command.Builder.of(command).addOptions(runnableToStringInstanceIds).
+ addOption(Constants.UPDATE_RUNNABLE_INSTANCES_RESULT_ZK_PATH,
zkResultPath).
+ build();
+ Message message =
SystemMessages.updateRunnablesInstances(updateStateCommand);
+
+ return sendUpdateRunnableInstancesMessage(message, zkResultPath, new
TypeToken<Map<String, List<V>>>() {});
+ }
+
+ // Helper method to send message to update runnable instances.
+ @SuppressWarnings("unchecked")
+ private <V> ListenableFuture<V> sendUpdateRunnableInstancesMessage(final
Message message, final String zkResultPath,
+ final
TypeToken<V> type) {
+
+ // Create ZK node for result
+ zkClient.create(zkResultPath, null, CreateMode.PERSISTENT);
+
+ LOG.info("Create ZK path for runnable instances update {} at {}",
message, zkResultPath);
+
+ final SettableFuture<V> settableFuture = SettableFuture.create();
+
+ ListenableFuture<Command> sendMessageFuture = sendMessage(message,
message.getCommand());
+ Futures.addCallback(sendMessageFuture, new FutureCallback<Command>() {
+ @Override
+ public void onSuccess(final Command command) {
+ LOG.debug("Success sending update runnable instance message with
command: {}", command);
+
+ ZKOperations.watchData(zkClient, zkResultPath, new
ZKOperations.DataCallback() {
--- End diff --
I don't quit follow the logic here. Can you describe on high level what get
written to which ZK node(s) by whom at when?
> Support for restart instances of runnable in an application
> -----------------------------------------------------------
>
> Key: TWILL-116
> URL: https://issues.apache.org/jira/browse/TWILL-116
> Project: Apache Twill
> Issue Type: New Feature
> Components: core
> Reporter: Albert Shau
> Assignee: Henry Saputra
> Fix For: 0.6.0-incubating
>
> Attachments: TWILL-116-design-4.pdf, TWILL-116-design-5.pdf,
> TWILL-116-design-6.pdf, TWILL-116-design-7.pdf, TWILL-116-design-final-2.pdf
>
>
> Once an application is running, it would be good to be able to stop, start,
> and restart a specific runnable of the application without affecting other
> runnables.
> For example, I may be running multiple services in a single application, with
> each service as a different runnable. One of my services gets into an invalid
> state. I now want to restart just that runnable and not the other ones that
> are running properly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)