This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4beebb3 Add ability for function services to notify errors (#7187) 4beebb3 is described below commit 4beebb31a05afb0e310e7deed7ea1aac17b9521b Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sun Jun 7 22:13:05 2020 -0700 Add ability for function services to notify errors (#7187) * Add ability for function services to notify errors --- .../org/apache/pulsar/broker/PulsarService.java | 4 +- .../worker/PulsarWorkerAssignmentTest.java | 2 +- .../pulsar/functions/worker/ErrorNotifier.java | 56 +++++++ .../functions/worker/FunctionAssignmentTailer.java | 20 ++- .../worker/FunctionMetaDataTopicTailer.java | 6 +- .../functions/worker/FunctionRuntimeManager.java | 12 +- .../functions/worker/FunctionWorkerStarter.java | 3 +- .../org/apache/pulsar/functions/worker/Worker.java | 19 ++- .../pulsar/functions/worker/WorkerService.java | 12 +- .../worker/FunctionRuntimeManagerTest.java | 172 +++++++++++++++++++-- .../functions/worker/MembershipManagerTest.java | 13 +- 11 files changed, 276 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b2d5b7a..4848b41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -116,6 +116,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.TwoPhaseCompactor; +import org.apache.pulsar.functions.worker.ErrorNotifier; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.WorkerUtils; @@ -1265,7 +1266,8 @@ public class PulsarService implements AutoCloseable { throw ioe; } LOG.info("Function worker service setup completed"); - functionWorkerService.get().start(dlogURI, authenticationService, authorizationService); + // TODO figure out how to handle errors from function worker service + functionWorkerService.get().start(dlogURI, authenticationService, authorizationService, new ErrorNotifier()); LOG.info("Function worker service started"); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 70ab543..26fef7c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -287,7 +287,7 @@ public class PulsarWorkerAssignmentTest { final URI dlUri = functionsWorkerService.getDlogUri(); functionsWorkerService.stop(); functionsWorkerService = new WorkerService(workerConfig); - functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null); + functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null, new ErrorNotifier()); final FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager(); retryStrategically((test) -> { try { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java new file mode 100644 index 0000000..9a51943 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ErrorNotifier.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicReference; + +public class ErrorNotifier implements Serializable, AutoCloseable { + + private static final long serialVersionUID = 1L; + + private final AtomicReference<Throwable> error = new AtomicReference<>(); + + private volatile boolean isRunning; + + public ErrorNotifier() { + isRunning = true; + } + + public synchronized void triggerError(Throwable th) { + error.set(th); + this.notify(); + } + + public synchronized void waitForError() throws Exception { + while (isRunning && error.get() == null) { + this.wait(); + } + + if (isRunning) { + throw new Exception(error.get()); + } + } + + @Override + public synchronized void close() { + isRunning = false; + this.notify(); + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 98d5745..40cc581 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -39,7 +39,11 @@ public class FunctionAssignmentTailer implements AutoCloseable { private final Thread tailerThread; - public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig) throws PulsarClientException { + public FunctionAssignmentTailer( + FunctionRuntimeManager functionRuntimeManager, + ReaderBuilder readerBuilder, + WorkerConfig workerConfig, + ErrorNotifier errorNotifier) throws PulsarClientException { this.functionRuntimeManager = functionRuntimeManager; this.reader = readerBuilder @@ -49,21 +53,21 @@ public class FunctionAssignmentTailer implements AutoCloseable { .readCompacted(true) .startMessageId(MessageId.earliest) .create(); - + this.tailerThread = new Thread(() -> { while(isRunning) { try { Message<byte[]> msg = reader.readNext(); processAssignment(msg); - } catch (Exception e) { + } catch (Throwable th) { if (isRunning) { - log.error("Encountered error in assignment tailer", e); - + log.error("Encountered error in assignment tailer", th); // trigger fatal error - // TODO add mechanism to notify main thread + isRunning = false; + errorNotifier.triggerError(th); } else { - if (!(e instanceof InterruptedException)) { - log.warn("Encountered error when assignment tailer is not running", e); + if (!(th instanceof InterruptedException)) { + log.warn("Encountered error when assignment tailer is not running", th); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java index 4bcaaf7..b7108e9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java @@ -53,13 +53,13 @@ public class FunctionMetaDataTopicTailer @Override public void close() { - log.info("Stopping function state consumer"); + log.info("Stopping function metadata tailer"); try { reader.close(); } catch (IOException e) { - log.error("Failed to stop function state consumer", e); + log.error("Failed to stop function metadata tailer", e); } - log.info("Stopped function state consumer"); + log.info("Stopped function function metadata tailer"); } public void processRequest(Message<byte[]> msg) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 0b8e509..bc1ca53 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -129,10 +129,11 @@ public class FunctionRuntimeManager implements AutoCloseable{ private final FunctionMetaDataManager functionMetaDataManager; - + private final ErrorNotifier errorNotifier; + public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager, - FunctionMetaDataManager functionMetaDataManager) throws Exception { + FunctionMetaDataManager functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception { this.workerConfig = workerConfig; this.workerService = workerService; this.functionAdmin = workerService.getFunctionAdmin(); @@ -200,6 +201,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.membershipManager = membershipManager; this.functionMetaDataManager = functionMetaDataManager; + this.errorNotifier = errorNotifier; } /** @@ -210,7 +212,11 @@ public class FunctionRuntimeManager implements AutoCloseable{ public void initialize() { log.info("/** Initializing Runtime Manager **/"); try { - this.functionAssignmentTailer = new FunctionAssignmentTailer(this, this.getWorkerService().getClient().newReader(), workerConfig); + this.functionAssignmentTailer = new FunctionAssignmentTailer( + this, + this.getWorkerService().getClient().newReader(), + this.workerConfig, + this.errorNotifier); // start init phase this.isInitializePhase = true; // read all existing messages diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java index ab5332b..90ee762 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java @@ -65,8 +65,7 @@ public class FunctionWorkerStarter { final Worker worker = new Worker(workerConfig); try { worker.start(); - }catch(Exception e){ - log.error("Failed to start function worker", e); + } catch (Throwable th) { worker.stop(); System.exit(-1); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 6245e59..4c53537 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -57,19 +57,28 @@ public class Worker { new DefaultThreadFactory("zk-cache-callback")); private GlobalZooKeeperCache globalZkCache; private ConfigurationCacheService configurationCacheService; + private final ErrorNotifier errorNotifier; public Worker(WorkerConfig workerConfig) { this.workerConfig = workerConfig; this.workerService = new WorkerService(workerConfig); + this.errorNotifier = new ErrorNotifier(); } protected void start() throws Exception { - URI dlogUri = initialize(this.workerConfig); + URI dlogUri = initialize(workerConfig); - workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService()); - this.server = new WorkerServer(workerService); - this.server.start(); - log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort()); + workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService(), errorNotifier); + server = new WorkerServer(workerService); + server.start(); + log.info("/** Started worker server on port={} **/", this.workerConfig.getWorkerPort()); + + try { + errorNotifier.waitForError(); + } catch (Throwable th) { + log.error("!-- Fatal error encountered. Worker will exit now. --!", th); + throw th; + } } private static URI initialize(WorkerConfig workerConfig) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index edaade6..ec2c44b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -85,7 +85,8 @@ public class WorkerService { public void start(URI dlogUri, AuthenticationService authenticationService, - AuthorizationService authorizationService) throws InterruptedException { + AuthorizationService authorizationService, + ErrorNotifier errorNotifier) throws InterruptedException { log.info("Starting worker {}...", workerConfig.getWorkerId()); try { @@ -182,7 +183,14 @@ public class WorkerService { // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager, functionsManager, functionMetaDataManager); + this.workerConfig, + this, + this.dlogNamespace, + this.membershipManager, + connectorsManager, + functionsManager, + functionMetaDataManager, + errorNotifier); // Setting references to managers in scheduler this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index d266ae0..429e728 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -47,7 +47,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.any; @@ -55,6 +57,7 @@ import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; @@ -102,7 +105,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class))); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class))); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -185,7 +189,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class))); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class))); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -271,7 +276,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -401,7 +407,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -476,6 +483,131 @@ public class FunctionRuntimeManagerTest { assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo); } + @Test(timeOut = 10000) + public void testErrorNotifier() throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2); + PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); + Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); + + Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); + + PulsarClient pulsarClient = mock(PulsarClient.class); + + Reader<byte[]> reader = mock(Reader.class); + + + when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() { + @Override + public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable { + return messageList.poll(10, TimeUnit.SECONDS); + } + }); + + when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() { + @Override + public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable { + return new CompletableFuture<>(); + } + }); + + when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + return !messageList.isEmpty(); + } + }); + + ReaderBuilder readerBuilder = mock(ReaderBuilder.class); + doReturn(readerBuilder).when(pulsarClient).newReader(); + doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readerName(anyString()); + doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); + + doReturn(reader).when(readerBuilder).create(); + WorkerService workerService = mock(WorkerService.class); + doReturn(pulsarClient).when(workerService).getClient(); + doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + + ErrorNotifier errorNotifier = spy(new ErrorNotifier()); + + // test new assignment add functions + FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + workerConfig, + workerService, + mock(Namespace.class), + mock(MembershipManager.class), + mock(ConnectorsManager.class), + mock(FunctionsManager.class), + mock(FunctionMetaDataManager.class), + errorNotifier)); + FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); + doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); + doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); + functionRuntimeManager.setFunctionActioner(functionActioner); + + functionRuntimeManager.initialize(); + + // verify no errors occured + verify(errorNotifier, times(0)).triggerError(any()); + + messageList.add(message1); + + functionRuntimeManager.start(); + + verify(errorNotifier, times(0)).triggerError(any()); + + // trigger an error to be thrown + doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignment(any()); + + messageList.add(message2); + + try { + errorNotifier.waitForError(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "test"); + } + verify(errorNotifier, times(1)).triggerError(any()); + + functionRuntimeManager.close(); + } + @Test public void testRuntimeManagerInitialize() throws Exception { WorkerConfig workerConfig = new WorkerConfig(); @@ -573,6 +705,8 @@ public class FunctionRuntimeManagerTest { doReturn(pulsarClient).when(workerService).getClient(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + ErrorNotifier errorNotifier = mock(ErrorNotifier.class); + // test new assignment add functions FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager( workerConfig, @@ -581,7 +715,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + errorNotifier); FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); @@ -604,6 +739,9 @@ public class FunctionRuntimeManagerTest { new FunctionRuntimeInfo().setFunctionInstance( Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0) .build())); + + // verify no errors occured + verify(errorNotifier, times(0)).triggerError(any()); } @Test @@ -649,7 +787,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); functionRuntimeManager.setFunctionActioner(functionActioner); Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() @@ -753,7 +892,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); fail(); } catch (Exception e) { @@ -777,7 +917,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); fail(); } catch (Exception e) { @@ -801,7 +942,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); fail(); } catch (Exception e) { @@ -825,7 +967,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class); } catch (Exception e) { @@ -854,7 +997,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class); KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory(); @@ -882,7 +1026,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class); ProcessRuntimeFactory processRuntimeFactory = (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory(); @@ -906,7 +1051,8 @@ public class FunctionRuntimeManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class)); + mock(FunctionMetaDataManager.class), + mock(ErrorNotifier.class)); assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class); ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 14d1044..7acb3f7 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -155,7 +155,8 @@ public class MembershipManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - functionMetaDataManager)); + functionMetaDataManager, + mock(ErrorNotifier.class))); MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient, pulsarAdmin)); List<WorkerInfo> workerInfoList = new LinkedList<>(); @@ -228,7 +229,8 @@ public class MembershipManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - functionMetaDataManager)); + functionMetaDataManager, + mock(ErrorNotifier.class))); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin)); @@ -316,7 +318,8 @@ public class MembershipManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - functionMetaDataManager)); + functionMetaDataManager, + mock(ErrorNotifier.class))); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin)); List<WorkerInfo> workerInfoList = new LinkedList<>(); @@ -396,7 +399,8 @@ public class MembershipManagerTest { mock(MembershipManager.class), mock(ConnectorsManager.class), mock(FunctionsManager.class), - functionMetaDataManager)); + functionMetaDataManager, + mock(ErrorNotifier.class))); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient(), pulsarAdmin)); List<WorkerInfo> workerInfoList = new LinkedList<>(); @@ -440,5 +444,4 @@ public class MembershipManagerTest { verify(functionRuntimeManager, times(0)).removeAssignments(any()); assertEquals(membershipManager.unsignedFunctionDurations.size(), 0); } - }