This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95ed9b9  Stop all functions gracefully on closing worker-service 
(#2548)
95ed9b9 is described below

commit 95ed9b91af20bcdd396ee34d3ea6cc94a52fea31
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Thu Sep 13 14:50:35 2018 -0700

    Stop all functions gracefully on closing worker-service (#2548)
    
    ### Motivation
    
    Right now, if stopping 
[WorkerService](https://github.com/apache/incubator-pulsar/blob/master/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java#L189)
 doesn't stop functions and all the threads stayed alive event `WorkerService` 
is stopped.
    
    ### Modifications
    
    Stop all function resource gracefully while stopping worker service.
    
    ### Result
    
    Function threads will not stay alive while stopping worker-service.
---
 .../functions/worker/FunctionRuntimeManager.java     | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index ee6eeec..43cd27b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -389,6 +389,25 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         return Response.status(Status.OK).build();
     }
 
+    /**
+     * It stops all functions instances owned by current worker
+     * @throws Exception
+     */
+    public void stopAllOwnedFunctions() throws Exception {
+        final String workerId = this.workerConfig.getWorkerId();
+        Map<String, Assignment> assignments = 
workerIdToAssignments.get(workerId);
+        if (assignments != null) {
+            assignments.values().forEach(assignment -> {
+                String fullyQualifiedInstanceId = 
Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                try {
+                    stopFunction(fullyQualifiedInstanceId, false);
+                } catch (Exception e) {
+                    log.warn("Failed to stop function {} - {}", 
fullyQualifiedInstanceId, e.getMessage());
+                }
+            });
+        }
+    }
+
     private void stopFunction(String fullyQualifiedInstanceId, boolean 
restart) throws Exception {
         log.info("[{}] {}..", restart ? "restarting" : "stopping", 
fullyQualifiedInstanceId);
         FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
@@ -647,6 +666,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
 
     @Override
     public void close() throws Exception {
+        stopAllOwnedFunctions();
         this.functionActioner.close();
         this.functionAssignmentTailer.close();
         if (runtimeFactory != null) {

Reply via email to