This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 878362c  Simplified the workflow of functionruntime manager (#3551)
878362c is described below

commit 878362c2f08055dedf11477fd434ad83b98a4faf
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Feb 26 10:57:55 2019 -0800

    Simplified the workflow of functionruntime manager (#3551)
    
    * Simplified the workflow of functionruntime manager
    
    * Fix unittest
    
    * Took feedback into account
    
    * added missing imports
---
 .../pulsar/functions/worker/FunctionAction.java    |  40 -----
 .../pulsar/functions/worker/FunctionActioner.java  | 147 ++++++-----------
 .../functions/worker/FunctionRuntimeManager.java   | 125 +++++----------
 .../functions/worker/FunctionActionerTest.java     |  27 ++--
 .../worker/FunctionRuntimeManagerTest.java         | 173 ++++++++++++---------
 5 files changed, 199 insertions(+), 313 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
deleted file mode 100644
index ded8268..0000000
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import lombok.*;
-import lombok.experimental.Accessors;
-
-@Data
-@Setter
-@Getter
-@EqualsAndHashCode
-@ToString
-@Accessors(chain = true)
-public class FunctionAction {
-
-    public enum Action {
-        START,
-        STOP,
-        TERMINATE
-    }
-
-    private Action action;
-    private FunctionRuntimeInfo functionRuntimeInfo;
-}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 8033507..1d1014e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.MoreFiles;
+import com.google.common.io.MoreFiles; 
 import com.google.common.io.RecursiveDeleteOption;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -79,120 +79,74 @@ import static 
org.apache.pulsar.functions.utils.Utils.getSourceType;
 @EqualsAndHashCode
 @ToString
 @Slf4j
-public class FunctionActioner implements AutoCloseable {
+public class FunctionActioner {
 
     private final WorkerConfig workerConfig;
     private final RuntimeFactory runtimeFactory;
     private final Namespace dlogNamespace;
-    private LinkedBlockingQueue<FunctionAction> actionQueue;
-    private volatile boolean running;
-    private Thread actioner;
     private final ConnectorsManager connectorsManager;
     private final PulsarAdmin pulsarAdmin;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            LinkedBlockingQueue<FunctionAction> actionQueue,
                             ConnectorsManager connectorsManager, PulsarAdmin 
pulsarAdmin) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
-        this.actionQueue = actionQueue;
         this.connectorsManager = connectorsManager;
         this.pulsarAdmin = pulsarAdmin;
-        actioner = new Thread(() -> {
-            log.info("Starting Actioner Thread...");
-            while(running) {
-                try {
-                    FunctionAction action = actionQueue.poll(1, 
TimeUnit.SECONDS);
-                    processAction(action);
-                } catch (InterruptedException ex) {
-                }
-            }
-        });
-        actioner.setName("FunctionActionerThread");
     }
 
+    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+        try {
+            FunctionMetaData functionMetaData = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
+            FunctionDetails functionDetails = 
functionMetaData.getFunctionDetails();
+            int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
 
-    void processAction(FunctionAction action) {
-        if (action == null) return;
-
-        switch (action.getAction()) {
-            case START:
-                try {
-                    startFunction(action.getFunctionRuntimeInfo());
-                } catch (Exception ex) {
-                    FunctionDetails details = 
action.getFunctionRuntimeInfo().getFunctionInstance()
-                            .getFunctionMetaData().getFunctionDetails();
-                    log.info("{}/{}/{} Error starting function", 
details.getTenant(), details.getNamespace(),
-                            details.getName(), ex);
-                    action.getFunctionRuntimeInfo().setStartupException(ex);
-                }
-                break;
-            case STOP:
-                stopFunction(action.getFunctionRuntimeInfo());
-                break;
-            case TERMINATE:
-                terminateFunction(action.getFunctionRuntimeInfo());
-                break;
-        }
-    }
-
-    public void start() {
-        this.running = true;
-        actioner.start();
-    }
-
-    @Override
-    public void close() {
-        running = false;
-    }
-
-    public void join() throws InterruptedException {
-        actioner.join();
-    }
-
-    @VisibleForTesting
-    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws 
Exception {
-        FunctionMetaData functionMetaData = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
-        FunctionDetails functionDetails = 
functionMetaData.getFunctionDetails();
-        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-
-        log.info("{}/{}/{}-{} Starting function ...", 
functionDetails.getTenant(), functionDetails.getNamespace(),
-                functionDetails.getName(), instanceId);
+            log.info("{}/{}/{}-{} Starting function ...", 
functionDetails.getTenant(), functionDetails.getNamespace(),
+                    functionDetails.getName(), instanceId);
 
-        String packageFile;
+            String packageFile;
 
-        String pkgLocation = 
functionMetaData.getPackageLocation().getPackagePath();
-        boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
+            String pkgLocation = 
functionMetaData.getPackageLocation().getPackagePath();
+            boolean isPkgUrlProvided = 
isFunctionPackageUrlSupported(pkgLocation);
 
-        if (runtimeFactory.externallyManaged()) {
-            packageFile = pkgLocation;
-        } else {
-            if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
-                URL url = new URL(pkgLocation);
-                File pkgFile = new File(url.toURI());
-                packageFile = pkgFile.getAbsolutePath();
-            } else if (isFunctionCodeBuiltin(functionDetails)) {
-                File pkgFile = 
getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
-                packageFile = pkgFile.getAbsolutePath();
+            if (runtimeFactory.externallyManaged()) {
+                packageFile = pkgLocation;
             } else {
-                File pkgDir = new File(workerConfig.getDownloadDirectory(),
-                        getDownloadPackagePath(functionMetaData, instanceId));
-                pkgDir.mkdirs();
-                File pkgFile = new File(
-                        pkgDir,
-                        new 
File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(),
 functionMetaData.getPackageLocation())).getName());
-                downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, 
instanceId);
-                packageFile = pkgFile.getAbsolutePath();
+                if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
+                    URL url = new URL(pkgLocation);
+                    File pkgFile = new File(url.toURI());
+                    packageFile = pkgFile.getAbsolutePath();
+                } else if (isFunctionCodeBuiltin(functionDetails)) {
+                    File pkgFile = 
getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
+                    packageFile = pkgFile.getAbsolutePath();
+                } else {
+                    File pkgDir = new File(workerConfig.getDownloadDirectory(),
+                            getDownloadPackagePath(functionMetaData, 
instanceId));
+                    pkgDir.mkdirs();
+                    File pkgFile = new File(
+                            pkgDir,
+                            new 
File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(),
 functionMetaData.getPackageLocation())).getName());
+                    downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, 
instanceId);
+                    packageFile = pkgFile.getAbsolutePath();
+                }
             }
-        }
 
-        RuntimeSpawner runtimeSpawner = 
getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
-        functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
-
-        runtimeSpawner.start();
+            RuntimeSpawner runtimeSpawner = 
getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
+            functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
+
+            runtimeSpawner.start();
+            return;
+        } catch (Exception ex) {
+            FunctionDetails details = functionRuntimeInfo.getFunctionInstance()
+                    .getFunctionMetaData().getFunctionDetails();
+            log.info("{}/{}/{} Error starting function", details.getTenant(), 
details.getNamespace(),
+                    details.getName(), ex);
+            functionRuntimeInfo.setStartupException(ex);
+            return;
+        }
     }
 
     RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String 
packageFile) {
@@ -306,12 +260,13 @@ public class FunctionActioner implements AutoCloseable {
         }
     }
 
-    private void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
-        FunctionDetails details = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
+    public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+        FunctionDetails details = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                .getFunctionDetails();
+        log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), 
details.getNamespace(), details.getName(),
+                functionRuntimeInfo.getFunctionInstance().getInstanceId());
         String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
 
-        log.info("{}-{} Terminating function...", 
fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
-
         stopFunction(functionRuntimeInfo);
         //cleanup subscriptions
         if (details.getSource().getCleanupSubscription()) {
@@ -483,4 +438,4 @@ public class FunctionActioner implements AutoCloseable {
         }
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index ba49678..3d55065 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -57,7 +57,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 /**
@@ -73,8 +72,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     // All the runtime info related to functions executed by this worker
     // Fully Qualified InstanceId - > FunctionRuntimeInfo
-    // NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo 
methods to modify this data structure
-    // Since during initialization phase nothing should be modified
     @VisibleForTesting
     Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new 
ConcurrentHashMap<>();
 
@@ -82,12 +79,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     @Getter
     final WorkerConfig workerConfig;
 
-    @VisibleForTesting
-    LinkedBlockingQueue<FunctionAction> actionQueue;
-
     private FunctionAssignmentTailer functionAssignmentTailer;
 
     @Setter
+    @Getter
     private FunctionActioner functionActioner;
 
     @Getter
@@ -174,10 +169,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             throw new RuntimeException("Either Thread, Process or Kubernetes 
Container Factory need to be set");
         }
 
-        this.actionQueue = new LinkedBlockingQueue<>();
-
         this.functionActioner = new FunctionActioner(this.workerConfig, 
runtimeFactory,
-                dlogNamespace, actionQueue, connectorsManager, 
workerService.getBrokerAdmin());
+                dlogNamespace, connectorsManager, 
workerService.getBrokerAdmin());
 
         this.membershipManager = membershipManager;
         this.functionMetaDataManager = functionMetaDataManager;
@@ -226,8 +219,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     public void start() {
         log.info("/** Starting Function Runtime Manager **/");
         log.info("Initialize metrics sink...");
-        log.info("Starting function actioner...");
-        this.functionActioner.start();
         log.info("Starting function assignment tailer...");
         this.functionAssignmentTailer.start();
     }
@@ -447,10 +438,10 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         log.info("[{}] {}..", restart ? "restarting" : "stopping", 
fullyQualifiedInstanceId);
         FunctionRuntimeInfo functionRuntimeInfo = 
this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
         if (functionRuntimeInfo != null) {
-            this.functionActioner.stopFunction(functionRuntimeInfo);
+            this.conditionallyStopFunction(functionRuntimeInfo);
             try {
                 if(restart) {
-                    this.functionActioner.startFunction(functionRuntimeInfo);
+                    this.conditionallyStartFunction(functionRuntimeInfo);
                 }
             } catch (Exception ex) {
                 log.info("{} Error re-starting function", 
fullyQualifiedInstanceId, ex);
@@ -630,7 +621,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 if 
(!assignment.getInstance().equals(existingAssignment.getInstance())) {
                     //stop function
                     if (functionRuntimeInfo != null) {
-                        this.insertStopAction(functionRuntimeInfo);
+                        this.conditionallyStopFunction(functionRuntimeInfo);
                     }
                     // still assigned to me, need to restart
                     if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
@@ -639,11 +630,11 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                             FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
                             
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
 
-                            this.insertStartAction(newFunctionRuntimeInfo);
-                            
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
+                            
this.conditionallyStartFunction(newFunctionRuntimeInfo);
+                            
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
                         }
                     } else {
-                        deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
+                        
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
                     }
                 } else {
                     // if assignment got transferred to me just set function 
runtime
@@ -655,15 +646,15 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                                 
assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
                         
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
 
-                        this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
+                        
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
                     } else {
-                        deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
+                        
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
                     }
                 }
             } else {
                 //stop function
                 if (functionRuntimeInfo != null) {
-                    this.insertStopAction(functionRuntimeInfo);
+                    this.conditionallyStopFunction(functionRuntimeInfo);
                 }
                 // still assigned to me, need to restart
                 if 
(assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
@@ -671,11 +662,11 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                         //start again
                         FunctionRuntimeInfo newFunctionRuntimeInfo = new 
FunctionRuntimeInfo();
                         
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
-                        this.insertStartAction(newFunctionRuntimeInfo);
-                        this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
+                        
this.conditionallyStartFunction(newFunctionRuntimeInfo);
+                        
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
newFunctionRuntimeInfo);
                     }
                 } else {
-                    deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
+                    
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
                 }
             }
 
@@ -699,12 +690,12 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
             // TODO could be a race condition here if functionMetaDataTailer 
somehow does not receive the functionMeta prior to the 
functionAssignmentsTailer gets the assignment for the function.
             if 
(this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), 
functionDetails.getNamespace(), functionDetails.getName())) {
                 // function still exists thus probably an update or stop 
operation
-                this.insertStopAction(functionRuntimeInfo);
+                this.conditionallyStopFunction(functionRuntimeInfo);
             } else {
                 // function doesn't exist anymore thus we should terminate
-                this.insertTerminateAction(functionRuntimeInfo);
+                this.conditionallyTerminateFunction(functionRuntimeInfo);
             }
-            this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
+            this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
         }
         
         String workerId = null;
@@ -751,16 +742,14 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         if (functionRuntimeInfo == null) {
             functionRuntimeInfo = new FunctionRuntimeInfo()
                     .setFunctionInstance(assignment.getInstance());
-            this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, 
functionRuntimeInfo);
-
+            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
functionRuntimeInfo);
         } else {
             //Somehow this function is already started
             log.warn("Function {} already running. Going to restart function.",
                     functionRuntimeInfo);
-            this.insertStopAction(functionRuntimeInfo);
+            this.conditionallyStopFunction(functionRuntimeInfo);
         }
-
-        this.insertStartAction(functionRuntimeInfo);
+        this.conditionallyStartFunction(functionRuntimeInfo);
     }
 
     public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
@@ -770,48 +759,6 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
     /**
      * Private methods for internal use.  Should not be used outside of this 
class
      */
-
-    @VisibleForTesting
-    void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        if (!this.isInitializePhase) {
-            FunctionAction functionAction = new FunctionAction();
-            functionAction.setAction(FunctionAction.Action.STOP);
-            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-            try {
-                actionQueue.put(functionAction);
-            } catch (InterruptedException ex) {
-                throw new RuntimeException("Interrupted while putting action");
-            }
-        }
-    }
-
-    @VisibleForTesting
-    void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        if (!this.isInitializePhase) {
-            FunctionAction functionAction = new FunctionAction();
-            functionAction.setAction(FunctionAction.Action.START);
-            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-            try {
-                actionQueue.put(functionAction);
-            } catch (InterruptedException ex) {
-                throw new RuntimeException("Interrupted while putting action");
-            }
-        }
-    }
-
-    void insertTerminateAction(FunctionRuntimeInfo functionRuntimeInfo) {
-        if (!this.isInitializePhase) {
-            FunctionAction functionAction = new FunctionAction();
-            functionAction.setAction(FunctionAction.Action.TERMINATE);
-            functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
-            try {
-                actionQueue.put(functionAction);
-            } catch (InterruptedException ex) {
-                throw new RuntimeException("Interrupted while putting action");
-            }
-        }
-    }
-
     private Assignment findAssignment(String tenant, String namespace, String 
functionName, int instanceId) {
         String fullyQualifiedInstanceId
                 = 
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, 
namespace, functionName, instanceId);
@@ -844,22 +791,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                 assignment);
     }
 
-    private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
-        if (!this.isInitializePhase) {
-            this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
-        }
-    }
-
-    private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, 
FunctionRuntimeInfo functionRuntimeInfo) {
-        // Don't modify Function Runtime Infos when initializing
-        if (!this.isInitializePhase) {
-            this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, 
functionRuntimeInfo);
-        }
-    }
-
     @Override
     public void close() throws Exception {
-        this.functionActioner.close();
         this.functionAssignmentTailer.close();
 
         stopAllOwnedFunctions();
@@ -908,4 +841,22 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
         }
         return toStart;
     }
+
+    private void conditionallyStartFunction(FunctionRuntimeInfo 
functionRuntimeInfo) {
+        if (!this.isInitializePhase) {
+            this.functionActioner.startFunction(functionRuntimeInfo);
+        }
+    }
+
+    private void conditionallyStopFunction(FunctionRuntimeInfo 
functionRuntimeInfo) {
+        if (!this.isInitializePhase) {
+            this.functionActioner.stopFunction(functionRuntimeInfo);
+        }
+    }
+
+    private void conditionallyTerminateFunction(FunctionRuntimeInfo 
functionRuntimeInfo) {
+        if (!this.isInitializePhase) {
+            this.functionActioner.terminateFunction(functionRuntimeInfo);
+        }
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 65ead73..cd636d4 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -25,11 +25,6 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
-
-import java.net.UnknownHostException;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -40,6 +35,8 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.testng.annotations.Test;
 import static org.apache.pulsar.common.functions.Utils.FILE;
+import static org.testng.Assert.*;
+import static org.testng.AssertJUnit.assertFalse;
 
 /**
  * Unit test of {@link FunctionActioner}.
@@ -66,10 +63,9 @@ public class FunctionActionerTest {
         // throw exception when dlogNamespace is accessed by actioner and 
verify it
         final String exceptionMsg = "dl namespace not-found";
         doThrow(new 
IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
-        LinkedBlockingQueue<FunctionAction> queue = new 
LinkedBlockingQueue<>();
 
         @SuppressWarnings("resource")
-        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace,
                 new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
         Runtime runtime = mock(Runtime.class);
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder()
@@ -80,13 +76,14 @@ public class FunctionActionerTest {
                 .build();
         FunctionRuntimeInfo functionRuntimeInfo = 
mock(FunctionRuntimeInfo.class);
         doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+        doThrow(new 
IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
 
         // actioner should try to download file from bk-dlogNamespace and 
fails with exception
         try {
             actioner.startFunction(functionRuntimeInfo);
-            fail("should have failed with dlogNamespace open");
-        } catch (IllegalArgumentException ie) {
-            assertEquals(ie.getMessage(), exceptionMsg);
+            assertFalse(true);
+        } catch (IllegalStateException ex) {
+            assertEquals(ex.getMessage(), "StartupException");
         }
     }
 
@@ -109,10 +106,9 @@ public class FunctionActionerTest {
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";
         doThrow(new 
IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
-        LinkedBlockingQueue<FunctionAction> queue = new 
LinkedBlockingQueue<>();
 
         @SuppressWarnings("resource")
-        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace, queue,
+        FunctionActioner actioner = new FunctionActioner(workerConfig, 
factory, dlogNamespace,
                 new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
 
         // (1) test with file url. functionActioner should be able to consider 
file-url and it should be able to call
@@ -141,12 +137,13 @@ public class FunctionActionerTest {
         instance = 
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0).build();
         functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
         doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+        doThrow(new 
IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
 
         try {
             actioner.startFunction(functionRuntimeInfo);
-            fail("Function-Actioner should have tried to donwload file from 
http-location");
-        } catch (UnknownHostException ue) {
-            // ok
+            assertFalse(true);
+        } catch (IllegalStateException ex) {
+            assertEquals(ex.getMessage(), "StartupException");
         }
     }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 4a3b1a8..a66ba90 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -48,13 +48,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 @Slf4j
 public class FunctionRuntimeManagerTest {
@@ -88,6 +82,11 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionMetaDataManager.class)));
+        FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
+        
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -122,8 +121,8 @@ public class FunctionRuntimeManagerTest {
                 .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment1);
         
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.get("worker-2")
                 .get("test-tenant/test-namespace/func-2:0"), assignment2);
-        verify(functionRuntimeManager, 
times(1)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager).insertStartAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+        verify(functionActioner, 
times(1)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner).startFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
                 if (o instanceof FunctionRuntimeInfo) {
@@ -137,15 +136,7 @@ public class FunctionRuntimeManagerTest {
                 return false;
             }
         }));
-        verify(functionRuntimeManager, 
times(0)).insertStopAction(any(FunctionRuntimeInfo.class));
-
-        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
-        Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
-                new FunctionAction()
-                        .setAction(FunctionAction.Action.START)
-                        .setFunctionRuntimeInfo(new 
FunctionRuntimeInfo().setFunctionInstance(
-                                
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
-                                        .build()))));
+        verify(functionActioner, 
times(0)).stopFunction(any(FunctionRuntimeInfo.class));
 
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1);
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"),
@@ -182,6 +173,11 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionMetaDataManager.class)));
+        FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
+        
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -224,9 +220,9 @@ public class FunctionRuntimeManagerTest {
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-2").get("test-tenant/test-namespace/func-2:0"), 
assignment2);
 
-        verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(1)).insertTerminateAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager).insertTerminateAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+        verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner).terminateFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
                 if (o instanceof FunctionRuntimeInfo) {
@@ -241,14 +237,6 @@ public class FunctionRuntimeManagerTest {
             }
         }));
 
-        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
-        Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
-                new FunctionAction()
-                        .setAction(FunctionAction.Action.TERMINATE)
-                        .setFunctionRuntimeInfo(new 
FunctionRuntimeInfo().setFunctionInstance(
-                                
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
-                                        .build()))));
-
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 0);
     }
 
@@ -279,6 +267,11 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionMetaDataManager.class)));
+        FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
+        
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -303,6 +296,7 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.setAssignment(assignment1);
         functionRuntimeManager.setAssignment(assignment2);
         reset(functionRuntimeManager);
+        reset(functionActioner);
 
         Function.Assignment assignment3 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -322,11 +316,11 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.processAssignment(assignment1);
         functionRuntimeManager.processAssignment(assignment3);
 
-        verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(1)).stopFunction(any(FunctionRuntimeInfo.class));
         // make sure terminate is not called since this is a update operation
-        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
-        verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+        verify(functionActioner).stopFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
                 if (o instanceof FunctionRuntimeInfo) {
@@ -341,8 +335,8 @@ public class FunctionRuntimeManagerTest {
             }
         }));
 
-        verify(functionRuntimeManager, 
times(1)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager).insertStartAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+        verify(functionActioner, 
times(1)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner).startFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
                 if (o instanceof FunctionRuntimeInfo) {
@@ -357,14 +351,6 @@ public class FunctionRuntimeManagerTest {
             }
         }));
 
-        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 2);
-        Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
-                new FunctionAction()
-                        .setAction(FunctionAction.Action.START)
-                        .setFunctionRuntimeInfo(new 
FunctionRuntimeInfo().setFunctionInstance(
-                                
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
-                                        .build()))));
-
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
         
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
@@ -373,7 +359,7 @@ public class FunctionRuntimeManagerTest {
                 .get("worker-1").get("test-tenant/test-namespace/func-2:0"), 
assignment3);
 
         reset(functionRuntimeManager);
-        functionRuntimeManager.actionQueue.clear();
+        reset(functionActioner);
 
         // add a stop
         Function.FunctionMetaData.Builder function2StoppedBldr = 
function2.toBuilder();
@@ -388,11 +374,11 @@ public class FunctionRuntimeManagerTest {
 
         functionRuntimeManager.processAssignment(assignment4);
 
-        verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(1)).stopFunction(any(FunctionRuntimeInfo.class));
         // make sure terminate is not called since this is a update operation
-        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
-        verify(functionRuntimeManager).insertStopAction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+        verify(functionActioner).stopFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
             @Override
             public boolean matches(Object o) {
                 if (o instanceof FunctionRuntimeInfo) {
@@ -407,15 +393,7 @@ public class FunctionRuntimeManagerTest {
             }
         }));
 
-        verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
-
-        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
-        Assert.assertTrue(functionRuntimeManager.actionQueue.contains(
-                new FunctionAction()
-                        .setAction(FunctionAction.Action.STOP)
-                        .setFunctionRuntimeInfo(new 
FunctionRuntimeInfo().setFunctionInstance(
-                                
Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0)
-                                        .build()))));
+        verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
 
         
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2);
         
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
@@ -453,6 +431,11 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionMetaDataManager.class)));
+        FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
+        
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
 
         Function.FunctionMetaData function1 = 
Function.FunctionMetaData.newBuilder().setFunctionDetails(
                 Function.FunctionDetails.newBuilder()
@@ -486,9 +469,9 @@ public class FunctionRuntimeManagerTest {
 
         functionRuntimeManager.processAssignment(assignment2);
 
-        verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(1)).insertStopAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(1)).stopFunction(any(FunctionRuntimeInfo.class));
 
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-2").get("test-tenant/test-namespace/func-1:0"), 
assignment2);
@@ -497,6 +480,11 @@ public class FunctionRuntimeManagerTest {
 
         /** Test transfer from other worker to me **/
         reset(functionRuntimeManager);
+        reset(functionActioner);
+        
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
 
         Function.Assignment assignment3 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -506,9 +494,9 @@ public class FunctionRuntimeManagerTest {
 
         functionRuntimeManager.processAssignment(assignment3);
 
-        verify(functionRuntimeManager, 
times(1)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertStopAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(1)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).stopFunction(any(FunctionRuntimeInfo.class));
 
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment3);
@@ -620,20 +608,55 @@ public class FunctionRuntimeManagerTest {
                 mock(MembershipManager.class),
                 mock(ConnectorsManager.class),
                 mock(FunctionMetaDataManager.class)));
-
+        FunctionActioner functionActioner = 
spy(functionRuntimeManager.getFunctionActioner());
+        
doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class));
+        
doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class));
+        functionRuntimeManager.setFunctionActioner(functionActioner);
 
         functionRuntimeManager.initialize();
 
         
Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1);
-        log.info("actionQueue: {}", functionRuntimeManager.actionQueue);
-        Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1);
+        verify(functionActioner, 
times(1)).startFunction(any(FunctionRuntimeInfo.class));
+        // Ideally this should be zero, but it will nevertheless be called 
with null runtimespawner which essentially
+        // results in it being noop. We ensure that in the check below.
+        verify(functionActioner, 
times(1)).stopFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
-        FunctionAction functionAction = 
functionRuntimeManager.actionQueue.poll();
+        verify(functionActioner).startFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+            @Override
+            public boolean matches(Object o) {
+                if (o instanceof FunctionRuntimeInfo) {
+                    FunctionRuntimeInfo functionRuntimeInfo = 
(FunctionRuntimeInfo) o;
 
-        // only actually start function1
-        Assert.assertEquals(functionAction.getAction(), 
FunctionAction.Action.START);
-        
Assert.assertEquals(functionAction.getFunctionRuntimeInfo().getFunctionInstance(),
 assignment1.getInstance());
+                    if 
(!functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())) {
+                        return false;
+                    }
+                    return true;
+                }
+                return false;
+            }
+        }));
+        verify(functionActioner).stopFunction(argThat(new 
ArgumentMatcher<FunctionRuntimeInfo>() {
+            @Override
+            public boolean matches(Object o) {
+                if (o instanceof FunctionRuntimeInfo) {
+                    FunctionRuntimeInfo functionRuntimeInfo = 
(FunctionRuntimeInfo) o;
 
+                    if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+                        return false;
+                    }
+                    return true;
+                }
+                return false;
+            }
+        }));
+
+        
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1);
+        
Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"),
+                new FunctionRuntimeInfo().setFunctionInstance(
+                        
Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
+                                .build()));
     }
 
     @Test
@@ -664,7 +687,7 @@ public class FunctionRuntimeManagerTest {
 
         FunctionActioner functionActioner = spy(new FunctionActioner(
                 workerConfig,
-                kubernetesRuntimeFactory, null, null, null, null));
+                kubernetesRuntimeFactory, null, null, null));
 
         // test new assignment update functions
         FunctionRuntimeManager functionRuntimeManager = spy(new 
FunctionRuntimeManager(
@@ -713,9 +736,9 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.processAssignment(assignment2);
 
         // make sure nothing is called
-        verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertStopAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).stopFunction(any(FunctionRuntimeInfo.class));
 
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-2").get("test-tenant/test-namespace/func-1:0"), 
assignment2);
@@ -732,9 +755,9 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.processAssignment(assignment3);
 
         // make sure nothing is called
-        verify(functionRuntimeManager, 
times(0)).insertStartAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class));
-        verify(functionRuntimeManager, 
times(0)).insertStopAction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
+        verify(functionActioner, 
times(0)).stopFunction(any(FunctionRuntimeInfo.class));
 
         Assert.assertEquals(functionRuntimeManager.workerIdToAssignments
                 .get("worker-1").get("test-tenant/test-namespace/func-1:0"), 
assignment3);

Reply via email to