This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 95ed9b9 Stop all functions gracefully on closing worker-service (#2548) 95ed9b9 is described below commit 95ed9b91af20bcdd396ee34d3ea6cc94a52fea31 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Sep 13 14:50:35 2018 -0700 Stop all functions gracefully on closing worker-service (#2548) ### Motivation Right now, if stopping [WorkerService](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java#L189) doesn't stop functions and all the threads stayed alive event `WorkerService` is stopped. ### Modifications Stop all function resource gracefully while stopping worker service. ### Result Function threads will not stay alive while stopping worker-service. --- .../functions/worker/FunctionRuntimeManager.java | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index ee6eeec..43cd27b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -389,6 +389,25 @@ public class FunctionRuntimeManager implements AutoCloseable{ return Response.status(Status.OK).build(); } + /** + * It stops all functions instances owned by current worker + * @throws Exception + */ + public void stopAllOwnedFunctions() throws Exception { + final String workerId = this.workerConfig.getWorkerId(); + Map<String, Assignment> assignments = workerIdToAssignments.get(workerId); + if (assignments != null) { + assignments.values().forEach(assignment -> { + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + try { + stopFunction(fullyQualifiedInstanceId, false); + } catch (Exception e) { + log.warn("Failed to stop function {} - {}", fullyQualifiedInstanceId, e.getMessage()); + } + }); + } + } + private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception { log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId); FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); @@ -647,6 +666,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Override public void close() throws Exception { + stopAllOwnedFunctions(); this.functionActioner.close(); this.functionAssignmentTailer.close(); if (runtimeFactory != null) {