This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 878362c Simplified the workflow of functionruntime manager (#3551) 878362c is described below commit 878362c2f08055dedf11477fd434ad83b98a4faf Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Feb 26 10:57:55 2019 -0800 Simplified the workflow of functionruntime manager (#3551) * Simplified the workflow of functionruntime manager * Fix unittest * Took feedback into account * added missing imports --- .../pulsar/functions/worker/FunctionAction.java | 40 ----- .../pulsar/functions/worker/FunctionActioner.java | 147 ++++++----------- .../functions/worker/FunctionRuntimeManager.java | 125 +++++---------- .../functions/worker/FunctionActionerTest.java | 27 ++-- .../worker/FunctionRuntimeManagerTest.java | 173 ++++++++++++--------- 5 files changed, 199 insertions(+), 313 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java deleted file mode 100644 index ded8268..0000000 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAction.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import lombok.*; -import lombok.experimental.Accessors; - -@Data -@Setter -@Getter -@EqualsAndHashCode -@ToString -@Accessors(chain = true) -public class FunctionAction { - - public enum Action { - START, - STOP, - TERMINATE - } - - private Action action; - private FunctionRuntimeInfo functionRuntimeInfo; -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 8033507..1d1014e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.functions.worker; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.MoreFiles; +import com.google.common.io.MoreFiles; import com.google.common.io.RecursiveDeleteOption; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -79,120 +79,74 @@ import static org.apache.pulsar.functions.utils.Utils.getSourceType; @EqualsAndHashCode @ToString @Slf4j -public class FunctionActioner implements AutoCloseable { +public class FunctionActioner { private final WorkerConfig workerConfig; private final RuntimeFactory runtimeFactory; private final Namespace dlogNamespace; - private LinkedBlockingQueue<FunctionAction> actionQueue; - private volatile boolean running; - private Thread actioner; private final ConnectorsManager connectorsManager; private final PulsarAdmin pulsarAdmin; public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, - LinkedBlockingQueue<FunctionAction> actionQueue, ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) { this.workerConfig = workerConfig; this.runtimeFactory = runtimeFactory; this.dlogNamespace = dlogNamespace; - this.actionQueue = actionQueue; this.connectorsManager = connectorsManager; this.pulsarAdmin = pulsarAdmin; - actioner = new Thread(() -> { - log.info("Starting Actioner Thread..."); - while(running) { - try { - FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS); - processAction(action); - } catch (InterruptedException ex) { - } - } - }); - actioner.setName("FunctionActionerThread"); } + public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) { + try { + FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); + FunctionDetails functionDetails = functionMetaData.getFunctionDetails(); + int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - void processAction(FunctionAction action) { - if (action == null) return; - - switch (action.getAction()) { - case START: - try { - startFunction(action.getFunctionRuntimeInfo()); - } catch (Exception ex) { - FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance() - .getFunctionMetaData().getFunctionDetails(); - log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(), - details.getName(), ex); - action.getFunctionRuntimeInfo().setStartupException(ex); - } - break; - case STOP: - stopFunction(action.getFunctionRuntimeInfo()); - break; - case TERMINATE: - terminateFunction(action.getFunctionRuntimeInfo()); - break; - } - } - - public void start() { - this.running = true; - actioner.start(); - } - - @Override - public void close() { - running = false; - } - - public void join() throws InterruptedException { - actioner.join(); - } - - @VisibleForTesting - public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { - FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); - FunctionDetails functionDetails = functionMetaData.getFunctionDetails(); - int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); - - log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(), - functionDetails.getName(), instanceId); + log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(), + functionDetails.getName(), instanceId); - String packageFile; + String packageFile; - String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); - boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); + String pkgLocation = functionMetaData.getPackageLocation().getPackagePath(); + boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation); - if (runtimeFactory.externallyManaged()) { - packageFile = pkgLocation; - } else { - if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { - URL url = new URL(pkgLocation); - File pkgFile = new File(url.toURI()); - packageFile = pkgFile.getAbsolutePath(); - } else if (isFunctionCodeBuiltin(functionDetails)) { - File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails())); - packageFile = pkgFile.getAbsolutePath(); + if (runtimeFactory.externallyManaged()) { + packageFile = pkgLocation; } else { - File pkgDir = new File(workerConfig.getDownloadDirectory(), - getDownloadPackagePath(functionMetaData, instanceId)); - pkgDir.mkdirs(); - File pkgFile = new File( - pkgDir, - new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName()); - downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); - packageFile = pkgFile.getAbsolutePath(); + if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) { + URL url = new URL(pkgLocation); + File pkgFile = new File(url.toURI()); + packageFile = pkgFile.getAbsolutePath(); + } else if (isFunctionCodeBuiltin(functionDetails)) { + File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails())); + packageFile = pkgFile.getAbsolutePath(); + } else { + File pkgDir = new File(workerConfig.getDownloadDirectory(), + getDownloadPackagePath(functionMetaData, instanceId)); + pkgDir.mkdirs(); + File pkgFile = new File( + pkgDir, + new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName()); + downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId); + packageFile = pkgFile.getAbsolutePath(); + } } - } - RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile); - functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); - - runtimeSpawner.start(); + RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile); + functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); + + runtimeSpawner.start(); + return; + } catch (Exception ex) { + FunctionDetails details = functionRuntimeInfo.getFunctionInstance() + .getFunctionMetaData().getFunctionDetails(); + log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(), + details.getName(), ex); + functionRuntimeInfo.setStartupException(ex); + return; + } } RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) { @@ -306,12 +260,13 @@ public class FunctionActioner implements AutoCloseable { } } - private void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) { - FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails(); + public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) { + FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() + .getFunctionDetails(); + log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(), + functionRuntimeInfo.getFunctionInstance().getInstanceId()); String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details); - log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId()); - stopFunction(functionRuntimeInfo); //cleanup subscriptions if (details.getSource().getCleanupSubscription()) { @@ -483,4 +438,4 @@ public class FunctionActioner implements AutoCloseable { } } -} \ No newline at end of file +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index ba49678..3d55065 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -57,7 +57,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; /** @@ -73,8 +72,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ // All the runtime info related to functions executed by this worker // Fully Qualified InstanceId - > FunctionRuntimeInfo - // NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo methods to modify this data structure - // Since during initialization phase nothing should be modified @VisibleForTesting Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>(); @@ -82,12 +79,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Getter final WorkerConfig workerConfig; - @VisibleForTesting - LinkedBlockingQueue<FunctionAction> actionQueue; - private FunctionAssignmentTailer functionAssignmentTailer; @Setter + @Getter private FunctionActioner functionActioner; @Getter @@ -174,10 +169,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set"); } - this.actionQueue = new LinkedBlockingQueue<>(); - this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory, - dlogNamespace, actionQueue, connectorsManager, workerService.getBrokerAdmin()); + dlogNamespace, connectorsManager, workerService.getBrokerAdmin()); this.membershipManager = membershipManager; this.functionMetaDataManager = functionMetaDataManager; @@ -226,8 +219,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ public void start() { log.info("/** Starting Function Runtime Manager **/"); log.info("Initialize metrics sink..."); - log.info("Starting function actioner..."); - this.functionActioner.start(); log.info("Starting function assignment tailer..."); this.functionAssignmentTailer.start(); } @@ -447,10 +438,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId); FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); if (functionRuntimeInfo != null) { - this.functionActioner.stopFunction(functionRuntimeInfo); + this.conditionallyStopFunction(functionRuntimeInfo); try { if(restart) { - this.functionActioner.startFunction(functionRuntimeInfo); + this.conditionallyStartFunction(functionRuntimeInfo); } } catch (Exception ex) { log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex); @@ -630,7 +621,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ if (!assignment.getInstance().equals(existingAssignment.getInstance())) { //stop function if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); + this.conditionallyStopFunction(functionRuntimeInfo); } // still assigned to me, need to restart if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { @@ -639,11 +630,11 @@ public class FunctionRuntimeManager implements AutoCloseable{ FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); - this.insertStartAction(newFunctionRuntimeInfo); - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); + this.conditionallyStartFunction(newFunctionRuntimeInfo); + this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } } else { - deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); + this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } } else { // if assignment got transferred to me just set function runtime @@ -655,15 +646,15 @@ public class FunctionRuntimeManager implements AutoCloseable{ assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath()); newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); + this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } else { - deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); + this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } } } else { //stop function if (functionRuntimeInfo != null) { - this.insertStopAction(functionRuntimeInfo); + this.conditionallyStopFunction(functionRuntimeInfo); } // still assigned to me, need to restart if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) { @@ -671,11 +662,11 @@ public class FunctionRuntimeManager implements AutoCloseable{ //start again FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo(); newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance()); - this.insertStartAction(newFunctionRuntimeInfo); - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo); + this.conditionallyStartFunction(newFunctionRuntimeInfo); + this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); } } else { - deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); + this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } } @@ -699,12 +690,12 @@ public class FunctionRuntimeManager implements AutoCloseable{ // TODO could be a race condition here if functionMetaDataTailer somehow does not receive the functionMeta prior to the functionAssignmentsTailer gets the assignment for the function. if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) { // function still exists thus probably an update or stop operation - this.insertStopAction(functionRuntimeInfo); + this.conditionallyStopFunction(functionRuntimeInfo); } else { // function doesn't exist anymore thus we should terminate - this.insertTerminateAction(functionRuntimeInfo); + this.conditionallyTerminateFunction(functionRuntimeInfo); } - this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId); + this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); } String workerId = null; @@ -751,16 +742,14 @@ public class FunctionRuntimeManager implements AutoCloseable{ if (functionRuntimeInfo == null) { functionRuntimeInfo = new FunctionRuntimeInfo() .setFunctionInstance(assignment.getInstance()); - this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, functionRuntimeInfo); - + this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo); } else { //Somehow this function is already started log.warn("Function {} already running. Going to restart function.", functionRuntimeInfo); - this.insertStopAction(functionRuntimeInfo); + this.conditionallyStopFunction(functionRuntimeInfo); } - - this.insertStartAction(functionRuntimeInfo); + this.conditionallyStartFunction(functionRuntimeInfo); } public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() { @@ -770,48 +759,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ /** * Private methods for internal use. Should not be used outside of this class */ - - @VisibleForTesting - void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) { - if (!this.isInitializePhase) { - FunctionAction functionAction = new FunctionAction(); - functionAction.setAction(FunctionAction.Action.STOP); - functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); - try { - actionQueue.put(functionAction); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while putting action"); - } - } - } - - @VisibleForTesting - void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) { - if (!this.isInitializePhase) { - FunctionAction functionAction = new FunctionAction(); - functionAction.setAction(FunctionAction.Action.START); - functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); - try { - actionQueue.put(functionAction); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while putting action"); - } - } - } - - void insertTerminateAction(FunctionRuntimeInfo functionRuntimeInfo) { - if (!this.isInitializePhase) { - FunctionAction functionAction = new FunctionAction(); - functionAction.setAction(FunctionAction.Action.TERMINATE); - functionAction.setFunctionRuntimeInfo(functionRuntimeInfo); - try { - actionQueue.put(functionAction); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while putting action"); - } - } - } - private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) { String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId); @@ -844,22 +791,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ assignment); } - private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) { - if (!this.isInitializePhase) { - this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId); - } - } - - private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) { - // Don't modify Function Runtime Infos when initializing - if (!this.isInitializePhase) { - this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo); - } - } - @Override public void close() throws Exception { - this.functionActioner.close(); this.functionAssignmentTailer.close(); stopAllOwnedFunctions(); @@ -908,4 +841,22 @@ public class FunctionRuntimeManager implements AutoCloseable{ } return toStart; } + + private void conditionallyStartFunction(FunctionRuntimeInfo functionRuntimeInfo) { + if (!this.isInitializePhase) { + this.functionActioner.startFunction(functionRuntimeInfo); + } + } + + private void conditionallyStopFunction(FunctionRuntimeInfo functionRuntimeInfo) { + if (!this.isInitializePhase) { + this.functionActioner.stopFunction(functionRuntimeInfo); + } + } + + private void conditionallyTerminateFunction(FunctionRuntimeInfo functionRuntimeInfo) { + if (!this.isInitializePhase) { + this.functionActioner.terminateFunction(functionRuntimeInfo); + } + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java index 65ead73..cd636d4 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java @@ -25,11 +25,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.fail; - -import java.net.UnknownHostException; -import java.util.concurrent.LinkedBlockingQueue; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -40,6 +35,8 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.testng.annotations.Test; import static org.apache.pulsar.common.functions.Utils.FILE; +import static org.testng.Assert.*; +import static org.testng.AssertJUnit.assertFalse; /** * Unit test of {@link FunctionActioner}. @@ -66,10 +63,9 @@ public class FunctionActionerTest { // throw exception when dlogNamespace is accessed by actioner and verify it final String exceptionMsg = "dl namespace not-found"; doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any()); - LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>(); @SuppressWarnings("resource") - FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); Runtime runtime = mock(Runtime.class); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() @@ -80,13 +76,14 @@ public class FunctionActionerTest { .build(); FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class); doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); + doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any()); // actioner should try to download file from bk-dlogNamespace and fails with exception try { actioner.startFunction(functionRuntimeInfo); - fail("should have failed with dlogNamespace open"); - } catch (IllegalArgumentException ie) { - assertEquals(ie.getMessage(), exceptionMsg); + assertFalse(true); + } catch (IllegalStateException ex) { + assertEquals(ex.getMessage(), "StartupException"); } } @@ -109,10 +106,9 @@ public class FunctionActionerTest { Namespace dlogNamespace = mock(Namespace.class); final String exceptionMsg = "dl namespace not-found"; doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any()); - LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>(); @SuppressWarnings("resource") - FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue, + FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, new ConnectorsManager(workerConfig), mock(PulsarAdmin.class)); // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call @@ -141,12 +137,13 @@ public class FunctionActionerTest { instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0).build(); functionRuntimeInfo = mock(FunctionRuntimeInfo.class); doReturn(instance).when(functionRuntimeInfo).getFunctionInstance(); + doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any()); try { actioner.startFunction(functionRuntimeInfo); - fail("Function-Actioner should have tried to donwload file from http-location"); - } catch (UnknownHostException ue) { - // ok + assertFalse(true); + } catch (IllegalStateException ex) { + assertEquals(ex.getMessage(), "StartupException"); } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 4a3b1a8..a66ba90 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -48,13 +48,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @Slf4j public class FunctionRuntimeManagerTest { @@ -88,6 +82,11 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionMetaDataManager.class))); + FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -122,8 +121,8 @@ public class FunctionRuntimeManagerTest { .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment1); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.get("worker-2") .get("test-tenant/test-namespace/func-2:0"), assignment2); - verify(functionRuntimeManager, times(1)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager).insertStartAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { @Override public boolean matches(Object o) { if (o instanceof FunctionRuntimeInfo) { @@ -137,15 +136,7 @@ public class FunctionRuntimeManagerTest { return false; } })); - verify(functionRuntimeManager, times(0)).insertStopAction(any(FunctionRuntimeInfo.class)); - - Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); - Assert.assertTrue(functionRuntimeManager.actionQueue.contains( - new FunctionAction() - .setAction(FunctionAction.Action.START) - .setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance( - Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) - .build())))); + verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1); Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"), @@ -182,6 +173,11 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionMetaDataManager.class))); + FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -224,9 +220,9 @@ public class FunctionRuntimeManagerTest { Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-2").get("test-tenant/test-namespace/func-2:0"), assignment2); - verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(1)).insertTerminateAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager).insertTerminateAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner).terminateFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { @Override public boolean matches(Object o) { if (o instanceof FunctionRuntimeInfo) { @@ -241,14 +237,6 @@ public class FunctionRuntimeManagerTest { } })); - Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); - Assert.assertTrue(functionRuntimeManager.actionQueue.contains( - new FunctionAction() - .setAction(FunctionAction.Action.TERMINATE) - .setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance( - Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) - .build())))); - Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 0); } @@ -279,6 +267,11 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionMetaDataManager.class))); + FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -303,6 +296,7 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.setAssignment(assignment1); functionRuntimeManager.setAssignment(assignment2); reset(functionRuntimeManager); + reset(functionActioner); Function.Assignment assignment3 = Function.Assignment.newBuilder() .setWorkerId("worker-1") @@ -322,11 +316,11 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment1); functionRuntimeManager.processAssignment(assignment3); - verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class)); // make sure terminate is not called since this is a update operation - verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { @Override public boolean matches(Object o) { if (o instanceof FunctionRuntimeInfo) { @@ -341,8 +335,8 @@ public class FunctionRuntimeManagerTest { } })); - verify(functionRuntimeManager, times(1)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager).insertStartAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { @Override public boolean matches(Object o) { if (o instanceof FunctionRuntimeInfo) { @@ -357,14 +351,6 @@ public class FunctionRuntimeManagerTest { } })); - Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 2); - Assert.assertTrue(functionRuntimeManager.actionQueue.contains( - new FunctionAction() - .setAction(FunctionAction.Action.START) - .setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance( - Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0) - .build())))); - Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments @@ -373,7 +359,7 @@ public class FunctionRuntimeManagerTest { .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3); reset(functionRuntimeManager); - functionRuntimeManager.actionQueue.clear(); + reset(functionActioner); // add a stop Function.FunctionMetaData.Builder function2StoppedBldr = function2.toBuilder(); @@ -388,11 +374,11 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment4); - verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class)); // make sure terminate is not called since this is a update operation - verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager).insertStopAction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { @Override public boolean matches(Object o) { if (o instanceof FunctionRuntimeInfo) { @@ -407,15 +393,7 @@ public class FunctionRuntimeManagerTest { } })); - verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); - - Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); - Assert.assertTrue(functionRuntimeManager.actionQueue.contains( - new FunctionAction() - .setAction(FunctionAction.Action.STOP) - .setFunctionRuntimeInfo(new FunctionRuntimeInfo().setFunctionInstance( - Function.Instance.newBuilder().setFunctionMetaData(function2).setInstanceId(0) - .build())))); + verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 2); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); @@ -453,6 +431,11 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionMetaDataManager.class))); + FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( Function.FunctionDetails.newBuilder() @@ -486,9 +469,9 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment2); - verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(1)).insertStopAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class)); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2); @@ -497,6 +480,11 @@ public class FunctionRuntimeManagerTest { /** Test transfer from other worker to me **/ reset(functionRuntimeManager); + reset(functionActioner); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); Function.Assignment assignment3 = Function.Assignment.newBuilder() .setWorkerId("worker-1") @@ -506,9 +494,9 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment3); - verify(functionRuntimeManager, times(1)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertStopAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3); @@ -620,20 +608,55 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionMetaDataManager.class))); - + FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); functionRuntimeManager.initialize(); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); - log.info("actionQueue: {}", functionRuntimeManager.actionQueue); - Assert.assertEquals(functionRuntimeManager.actionQueue.size(), 1); + verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); + // Ideally this should be zero, but it will nevertheless be called with null runtimespawner which essentially + // results in it being noop. We ensure that in the check below. + verify(functionActioner, times(1)).stopFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); - FunctionAction functionAction = functionRuntimeManager.actionQueue.poll(); + verify(functionActioner).startFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + @Override + public boolean matches(Object o) { + if (o instanceof FunctionRuntimeInfo) { + FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o; - // only actually start function1 - Assert.assertEquals(functionAction.getAction(), FunctionAction.Action.START); - Assert.assertEquals(functionAction.getFunctionRuntimeInfo().getFunctionInstance(), assignment1.getInstance()); + if (!functionRuntimeInfo.getFunctionInstance().equals(assignment1.getInstance())) { + return false; + } + return true; + } + return false; + } + })); + verify(functionActioner).stopFunction(argThat(new ArgumentMatcher<FunctionRuntimeInfo>() { + @Override + public boolean matches(Object o) { + if (o instanceof FunctionRuntimeInfo) { + FunctionRuntimeInfo functionRuntimeInfo = (FunctionRuntimeInfo) o; + if (functionRuntimeInfo.getRuntimeSpawner() != null) { + return false; + } + return true; + } + return false; + } + })); + + Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.size(), 1); + Assert.assertEquals(functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0"), + new FunctionRuntimeInfo().setFunctionInstance( + Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) + .build())); } @Test @@ -664,7 +687,7 @@ public class FunctionRuntimeManagerTest { FunctionActioner functionActioner = spy(new FunctionActioner( workerConfig, - kubernetesRuntimeFactory, null, null, null, null)); + kubernetesRuntimeFactory, null, null, null)); // test new assignment update functions FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( @@ -713,9 +736,9 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment2); // make sure nothing is called - verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertStopAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-2").get("test-tenant/test-namespace/func-1:0"), assignment2); @@ -732,9 +755,9 @@ public class FunctionRuntimeManagerTest { functionRuntimeManager.processAssignment(assignment3); // make sure nothing is called - verify(functionRuntimeManager, times(0)).insertStartAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertTerminateAction(any(FunctionRuntimeInfo.class)); - verify(functionRuntimeManager, times(0)).insertStopAction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); + verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-1:0"), assignment3);