This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 68877f8 Fix leader/scheduler assignment processing lag problem (#7237) 68877f8 is described below commit 68877f8947ce5ed28f09152cbf91ea0b0ecafb87 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Fri Jun 19 14:40:26 2020 -0700 Fix leader/scheduler assignment processing lag problem (#7237) * Fix leader/scheduler assignment processing lag problem * add license header * adding more comments * improving impl * fixing bugs * improving impl * fixing tests * adding comments * add more testing * addressing comments * cleaning up * refactoring implementation * addressing comments Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../org/apache/pulsar/PulsarStandaloneStarter.java | 2 +- .../worker/ClusterServiceCoordinator.java | 9 +- .../functions/worker/FunctionAssignmentTailer.java | 170 ++++++--- .../functions/worker/FunctionMetaDataManager.java | 13 +- .../worker/FunctionMetaDataTopicTailer.java | 4 +- .../functions/worker/FunctionRuntimeManager.java | 52 ++- .../pulsar/functions/worker/LeaderService.java | 134 +++++++ .../pulsar/functions/worker/MembershipManager.java | 75 +--- .../pulsar/functions/worker/SchedulerManager.java | 232 +++++++---- .../pulsar/functions/worker/WorkerService.java | 157 +++++--- .../pulsar/functions/worker/WorkerUtils.java | 16 + .../worker/ClusterServiceCoordinatorTest.java | 14 +- .../worker/FunctionAssignmentTailerTest.java | 422 +++++++++++++++++++++ .../worker/FunctionRuntimeManagerTest.java | 128 ------- .../pulsar/functions/worker/LeaderServiceTest.java | 152 ++++++++ .../functions/worker/MembershipManagerTest.java | 43 --- .../functions/worker/SchedulerManagerTest.java | 120 ++++-- 17 files changed, 1256 insertions(+), 487 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index e5e9b45..9da388b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -102,7 +102,7 @@ public class PulsarStandaloneStarter extends PulsarStandalone { bkEnsemble.stop(); } } catch (Exception e) { - log.error("Shutdown failed: {}", e.getMessage()); + log.error("Shutdown failed: {}", e.getMessage(), e); } } }); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java index 419f65a..c2bde9d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java @@ -48,11 +48,11 @@ public class ClusterServiceCoordinator implements AutoCloseable { private final String workerId; private final Map<String, TimerTaskInfo> tasks = new HashMap<>(); private final ScheduledExecutorService executor; - private final MembershipManager membershipManager; + private final LeaderService leaderService; - public ClusterServiceCoordinator(String workerId, MembershipManager membershipManager) { + public ClusterServiceCoordinator(String workerId, LeaderService leaderService) { this.workerId = workerId; - this.membershipManager = membershipManager; + this.leaderService = leaderService; this.executor = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("cluster-service-coordinator-timer").build()); } @@ -62,11 +62,12 @@ public class ClusterServiceCoordinator implements AutoCloseable { } public void start() { + log.info("/** Starting cluster service coordinator **/"); for (Map.Entry<String, TimerTaskInfo> entry : this.tasks.entrySet()) { TimerTaskInfo timerTaskInfo = entry.getValue(); String taskName = entry.getKey(); this.executor.scheduleAtFixedRate(() -> { - boolean isLeader = membershipManager.isLeader(); + boolean isLeader = leaderService.isLeader(); if (isLeader) { try { timerTaskInfo.getTask().run(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 3038044..e1d9c1f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -25,95 +25,153 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; -import org.apache.pulsar.functions.proto.Function.Assignment; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +/** + * This class is responsible for reading assignments from the 'assignments' functions internal topic. + * Only functions worker leader writes to the topic while other workers read from the topic. + * When a worker become a leader, the worker will read to the end of the assignments topic and close its reader to the topic. + * Then the worker and new leader will be in charge of computing new assignments when necessary. + * The leader does not need to listen to the assignments topic because it can just update its in memory assignments map directly + * after it computes a new scheduling. When a worker loses leadership, the worker is start reading from the assignments topic again. + */ @Slf4j public class FunctionAssignmentTailer implements AutoCloseable { private final FunctionRuntimeManager functionRuntimeManager; - @Getter - private final Reader<byte[]> reader; + private final ReaderBuilder readerBuilder; + private final WorkerConfig workerConfig; + private final ErrorNotifier errorNotifier; + private Reader<byte[]> reader; private volatile boolean isRunning = false; + private volatile boolean exitOnEndOfTopic = false; + private CompletableFuture<Void> exitFuture; + private Thread tailerThread; - private final Thread tailerThread; + @Getter + private MessageId lastMessageId = null; public FunctionAssignmentTailer( FunctionRuntimeManager functionRuntimeManager, ReaderBuilder readerBuilder, WorkerConfig workerConfig, - ErrorNotifier errorNotifier) throws PulsarClientException { + ErrorNotifier errorNotifier) { this.functionRuntimeManager = functionRuntimeManager; - - this.reader = readerBuilder - .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-runtime-manager") - .readerName(workerConfig.getWorkerId() + "-function-runtime-manager") - .topic(workerConfig.getFunctionAssignmentTopic()) - .readCompacted(true) - .startMessageId(MessageId.earliest) - .create(); - - this.tailerThread = new Thread(() -> { - while(isRunning) { - try { - Message<byte[]> msg = reader.readNext(); - processAssignment(msg); - } catch (Throwable th) { - if (isRunning) { - log.error("Encountered error in assignment tailer", th); - // trigger fatal error - isRunning = false; - errorNotifier.triggerError(th); - } else { - if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) { - log.warn("Encountered error when assignment tailer is not running", th); - } - } + this.exitFuture = new CompletableFuture<>(); + this.readerBuilder = readerBuilder; + this.workerConfig = workerConfig; + this.errorNotifier = errorNotifier; + } - } + public synchronized CompletableFuture<Void> triggerReadToTheEndAndExit() { + exitOnEndOfTopic = true; + return this.exitFuture; + } + + public void startFromMessage(MessageId startMessageId) throws PulsarClientException { + if (!isRunning) { + isRunning = true; + if (reader == null) { + reader = createReader(startMessageId); } - }); - this.tailerThread.setName("assignment-tailer-thread"); + if (tailerThread == null || !tailerThread.isAlive()) { + tailerThread = getTailerThread(); + } + exitFuture = new CompletableFuture<>(); + tailerThread.start(); + } } - public void start() { - isRunning = true; - tailerThread.start(); + public synchronized void start() throws PulsarClientException { + MessageId startMessageId = lastMessageId == null ? MessageId.earliest : lastMessageId; + startFromMessage(startMessageId); } @Override - public void close() { - log.info("Stopping function assignment tailer"); + public synchronized void close() { + log.info("Closing function assignment tailer"); try { isRunning = false; - if (tailerThread != null && tailerThread.isAlive()) { - tailerThread.interrupt(); + + if (tailerThread != null) { + while (true) { + tailerThread.interrupt(); + + try { + tailerThread.join(5000, 0); + } catch (InterruptedException e) { + log.warn("Waiting for assignment tailer thread to stop is interrupted", e); + } + + if (tailerThread.isAlive()) { + log.warn("Assignment tailer thread is still alive. Will attempt to interrupt again."); + } else { + break; + } + } + tailerThread = null; } if (reader != null) { reader.close(); + reader = null; } + + exitFuture = null; + exitOnEndOfTopic = false; + } catch (IOException e) { log.error("Failed to stop function assignment tailer", e); } - log.info("Stopped function assignment tailer"); + } + + private Reader<byte[]> createReader(MessageId startMessageId) throws PulsarClientException { + log.info("Assignment tailer will start reading from message id {}", startMessageId); + + return WorkerUtils.createReader( + readerBuilder, + workerConfig.getWorkerId() + "-function-assignment-tailer", + workerConfig.getFunctionAssignmentTopic(), + startMessageId); } - public void processAssignment(Message<byte[]> msg) { - - if(msg.getData()==null || (msg.getData().length==0)) { - log.info("Received assignment delete: {}", msg.getKey()); - this.functionRuntimeManager.deleteAssignment(msg.getKey()); - } else { - Assignment assignment; - try { - assignment = Assignment.parseFrom(msg.getData()); - } catch (IOException e) { - log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), e); - throw new RuntimeException(e); + private Thread getTailerThread() { + Thread t = new Thread(() -> { + while (isRunning) { + try { + Message<byte[]> msg = reader.readNext(5, TimeUnit.SECONDS); + if (msg == null) { + if (exitOnEndOfTopic && !reader.hasMessageAvailable()) { + break; + } + } else { + functionRuntimeManager.processAssignmentMessage(msg); + // keep track of last message id + lastMessageId = msg.getMessageId(); + } + } catch (Throwable th) { + if (isRunning) { + log.error("Encountered error in assignment tailer", th); + // trigger fatal error + isRunning = false; + errorNotifier.triggerError(th); + } else { + if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) { + log.warn("Encountered error when assignment tailer is not running", th); + } + } + } } - log.info("Received assignment update: {}", assignment); - this.functionRuntimeManager.processAssignment(assignment); - } + log.info("tailer thread exiting"); + exitFuture.complete(null); + }); + t.setName("assignment-tailer-thread"); + return t; + } + + Thread getThread() { + return tailerThread; } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index 80f577d..6496f6e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -89,7 +89,6 @@ public class FunctionMetaDataManager implements AutoCloseable { * 1. Consume all existing function meta data upon start to establish existing state */ public void initialize() { - log.info("/** Initializing Function Metadata Manager **/"); try { this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, pulsarClient.newReader(), this.workerConfig, this.errorNotifier); @@ -99,16 +98,20 @@ public class FunctionMetaDataManager implements AutoCloseable { this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext()); } this.setInitializePhase(false); - // schedule functions if necessary - this.schedulerManager.schedule(); - // start function metadata tailer - this.functionMetaDataTopicTailer.start(); + } catch (Exception e) { log.error("Failed to initialize meta data store", e); throw new RuntimeException(e); } } + + public void start() { + // schedule functions if necessary + this.schedulerManager.schedule(); + // start function metadata tailer + this.functionMetaDataTopicTailer.start(); + } /** * Get the function metadata for a function diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java index 55a67a6..f0626ce 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataTopicTailer.java @@ -48,8 +48,8 @@ public class FunctionMetaDataTopicTailer this.reader = readerBuilder .topic(workerConfig.getFunctionMetadataTopic()) .startMessageId(MessageId.earliest) - .readerName(workerConfig.getWorkerId() + "-function-metadata-manager") - .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-manager") + .readerName(workerConfig.getWorkerId() + "-function-metadata-tailer") + .subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer") .create(); readerThread = new Thread(this); readerThread.setName("function-metadata-tailer-thread"); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index bc1ca53..bdde8a8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.common.functions.WorkerInfo; @@ -53,6 +54,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.UriBuilder; +import java.io.IOException; import java.net.URI; import java.util.*; import java.util.Map.Entry; @@ -109,8 +111,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Getter final WorkerConfig workerConfig; - private FunctionAssignmentTailer functionAssignmentTailer; - @Setter @Getter private FunctionActioner functionActioner; @@ -130,7 +130,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ private final FunctionMetaDataManager functionMetaDataManager; private final ErrorNotifier errorNotifier; - + public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager, FunctionMetaDataManager functionMetaDataManager, ErrorNotifier errorNotifier) throws Exception { @@ -210,21 +210,23 @@ public class FunctionRuntimeManager implements AutoCloseable{ * 2. After current assignments are read, assignments belonging to this worker will be processed */ public void initialize() { - log.info("/** Initializing Runtime Manager **/"); try { - this.functionAssignmentTailer = new FunctionAssignmentTailer( - this, - this.getWorkerService().getClient().newReader(), - this.workerConfig, - this.errorNotifier); + Reader<byte[]> reader = WorkerUtils.createReader( + workerService.getClient().newReader(), + workerConfig.getWorkerId() + "-function-assignment-initialize", + workerConfig.getFunctionAssignmentTopic(), + MessageId.earliest); + // start init phase this.isInitializePhase = true; // read all existing messages - while (this.functionAssignmentTailer.getReader().hasMessageAvailable()) { - this.functionAssignmentTailer.processAssignment(this.functionAssignmentTailer.getReader().readNext()); + while (reader.hasMessageAvailable()) { + processAssignmentMessage(reader.readNext()); } // init phase is done this.isInitializePhase = false; + // close reader + reader.close(); // realize existing assignments Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId()); if (assignmentMap != null) { @@ -243,14 +245,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ /** * Starts the function runtime manager */ - public void start() { - log.info("/** Starting Function Runtime Manager **/"); - this.functionAssignmentTailer.start(); - } - - /** - * Public methods - */ /** * Get current assignments @@ -611,6 +605,25 @@ public class FunctionRuntimeManager implements AutoCloseable{ return functionStats.calculateOverall(); } + + public synchronized void processAssignmentMessage(Message<byte[]> msg) { + + if(msg.getData()==null || (msg.getData().length==0)) { + log.info("Received assignment delete: {}", msg.getKey()); + deleteAssignment(msg.getKey()); + } else { + Assignment assignment; + try { + assignment = Assignment.parseFrom(msg.getData()); + } catch (IOException e) { + log.error("[{}] Received bad assignment update at message {}", msg.getMessageId(), e); + throw new RuntimeException(e); + } + log.info("Received assignment update: {}", assignment); + processAssignment(assignment); + } + } + /** * Process an assignment update from the assignment topic * @param newAssignment the assignment @@ -835,7 +848,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ @Override public void close() throws Exception { - this.functionAssignmentTailer.close(); stopAllOwnedFunctions(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java new file mode 100644 index 0000000..3d7ec92 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; + +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +public class LeaderService implements AutoCloseable, ConsumerEventListener { + + private final String consumerName; + private final FunctionAssignmentTailer functionAssignmentTailer; + private final ErrorNotifier errorNotifier; + private final SchedulerManager schedulerManager; + private ConsumerImpl<byte[]> consumer; + private final WorkerConfig workerConfig; + private final PulsarClient pulsarClient; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + + static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants"; + + private static String WORKER_IDENTIFIER = "id"; + + public LeaderService(WorkerService workerService, + PulsarClient pulsarClient, + FunctionAssignmentTailer functionAssignmentTailer, + SchedulerManager schedulerManager, + ErrorNotifier errorNotifier) { + this.workerConfig = workerService.getWorkerConfig(); + this.pulsarClient = pulsarClient; + this.functionAssignmentTailer = functionAssignmentTailer; + this.schedulerManager = schedulerManager; + this.errorNotifier = errorNotifier; + consumerName = String.format( + "%s:%s:%d", + workerConfig.getWorkerId(), + workerConfig.getWorkerHostname(), + workerConfig.getWorkerPort() + ); + + } + + public void start() throws PulsarClientException { + // the leaders service is using a `coordination` topic for leader election. + // we don't produce any messages into this topic, we only use the `failover` subscription + // to elect an active consumer as the leader worker. The leader worker will be responsible + // for scheduling snapshots for FMT and doing task assignment. + consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer() + .topic(workerConfig.getClusterCoordinationTopic()) + .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) + .subscriptionType(SubscriptionType.Failover) + .consumerEventListener(this) + .property(WORKER_IDENTIFIER, consumerName) + .consumerName(consumerName) + .subscribe(); + } + + @Override + public synchronized void becameActive(Consumer<?> consumer, int partitionId) { + if (isLeader.compareAndSet(false, true)) { + log.info("Worker {} became the leader.", consumerName); + try { + // trigger read to the end of the topic and exit + // Since the leader can just update its in memory assignments cache directly + functionAssignmentTailer.triggerReadToTheEndAndExit().get(); + functionAssignmentTailer.close(); + + // make sure scheduler is initialized because this worker + // is the leader and may need to start computing and writing assignments + schedulerManager.initialize(); + } catch (Throwable th) { + log.error("Encountered error when initializing to become leader", th); + errorNotifier.triggerError(th); + } + } + } + + @Override + public synchronized void becameInactive(Consumer<?> consumer, int partitionId) { + if (isLeader.compareAndSet(true, false)) { + log.info("Worker {} lost the leadership.", consumerName); + // when a worker has lost leadership it needs to start reading from the assignment topic again + try { + // stop scheduler manager since we are not the leader anymore + // will block if a schedule invocation is in process + schedulerManager.close(); + + // starting reading from assignment topic from the last published message of the scheduler + if (schedulerManager.getLastMessageProduced() == null) { + functionAssignmentTailer.start(); + } else { + functionAssignmentTailer.startFromMessage(schedulerManager.getLastMessageProduced()); + } + } catch (Throwable th) { + log.error("Encountered error in routine when worker lost leadership", th); + errorNotifier.triggerError(th); + } + } + } + + public boolean isLeader() { + return isLeader.get(); + } + + @Override + public void close() throws PulsarClientException { + if (consumer != null) { + consumer.close(); + } + } +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index 50a2f97..749f3a3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -54,14 +54,10 @@ import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeat * A simple implementation of leader election using a pulsar topic. */ @Slf4j -public class MembershipManager implements AutoCloseable, ConsumerEventListener { +public class MembershipManager implements AutoCloseable { - private final String consumerName; - private final ConsumerImpl<byte[]> consumer; private final WorkerConfig workerConfig; private PulsarAdmin pulsarAdmin; - private final CompletableFuture<Void> firstConsumerEventFuture; - private final AtomicBoolean isLeader = new AtomicBoolean(); static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants"; @@ -72,50 +68,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { @VisibleForTesting Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>(); - MembershipManager(WorkerService service, PulsarClient client, PulsarAdmin pulsarAdmin) - throws PulsarClientException { - this.workerConfig = service.getWorkerConfig(); + MembershipManager(WorkerService workerService, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) { + this.workerConfig = workerService.getWorkerConfig(); this.pulsarAdmin = pulsarAdmin; - consumerName = String.format( - "%s:%s:%d", - workerConfig.getWorkerId(), - workerConfig.getWorkerHostname(), - workerConfig.getWorkerPort() - ); - firstConsumerEventFuture = new CompletableFuture<>(); - // the membership manager is using a `coordination` topic for leader election. - // we don't produce any messages into this topic, we only use the `failover` subscription - // to elect an active consumer as the leader worker. The leader worker will be responsible - // for scheduling snapshots for FMT and doing task assignment. - consumer = (ConsumerImpl<byte[]>) client.newConsumer() - .topic(workerConfig.getClusterCoordinationTopic()) - .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) - .subscriptionType(SubscriptionType.Failover) - .consumerEventListener(this) - .property(WORKER_IDENTIFIER, consumerName) - .subscribe(); - - isLeader.set(checkLeader(service, consumer.getConsumerName())); - } - - @Override - public void becameActive(Consumer<?> consumer, int partitionId) { - firstConsumerEventFuture.complete(null); - if (isLeader.compareAndSet(false, true)) { - log.info("Worker {} became the leader.", consumerName); - } - } - - @Override - public void becameInactive(Consumer<?> consumer, int partitionId) { - firstConsumerEventFuture.complete(null); - if (isLeader.compareAndSet(true, false)) { - log.info("Worker {} lost the leadership.", consumerName); - } - } - - public boolean isLeader() { - return isLeader.get(); } public List<WorkerInfo> getCurrentMembership() { @@ -163,8 +118,8 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { } @Override - public void close() throws PulsarClientException { - consumer.close(); + public void close() { + } public void checkFailures(FunctionMetaDataManager functionMetaDataManager, @@ -274,24 +229,4 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener { schedulerManager.schedule(); } } - - /** - * Private methods - */ - - private boolean checkLeader(WorkerService service, String consumerName) { - try { - TopicStats stats = service.getBrokerAdmin().topics() - .getStats(service.getWorkerConfig().getClusterCoordinationTopic()); - String activeConsumerName = stats != null - && stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null - ? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName - : null; - return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName); - } catch (Exception e) { - log.warn("Failed to check leader {}", e.getMessage()); - } - return false; - } - } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 9c93443..33213a8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -18,52 +18,74 @@ */ package org.apache.pulsar.functions.worker; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; import org.apache.pulsar.functions.proto.Function.Instance; import org.apache.pulsar.functions.utils.Actions; -import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.scheduler.IScheduler; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; - -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; @Slf4j +/** + * The scheduler manager is used to compute scheduling of function instances + * Only the leader computes new schedulings and writes assignments to the assignment topic + * The lifecyle of this class is the following: + * 1. When worker becomes leader, this class with me initialized + * 2. When worker loses leadership, this class will be closed which also closes the worker's producer to the assignments topic + */ public class SchedulerManager implements AutoCloseable { private final WorkerConfig workerConfig; + private final ErrorNotifier errorNotifier; + private ThreadPoolExecutor executorService; + private final PulsarClient pulsarClient; @Setter private FunctionMetaDataManager functionMetaDataManager; @Setter + private LeaderService leaderService; + + @Setter private MembershipManager membershipManager; @Setter @@ -71,27 +93,36 @@ public class SchedulerManager implements AutoCloseable { private final IScheduler scheduler; - private final Producer<byte[]> producer; + private Producer<byte[]> producer; - private final ScheduledExecutorService executorService; + private ScheduledExecutorService scheduledExecutorService; private final PulsarAdmin admin; - + + @Getter + private Lock schedulerLock = new ReentrantLock(true); + + private volatile boolean isRunning = false; + AtomicBoolean isCompactionNeeded = new AtomicBoolean(false); private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; public static final String HEARTBEAT_TENANT = "pulsar-function"; public static final String HEARTBEAT_NAMESPACE = "heartbeat"; - public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) { + @Getter + private MessageId lastMessageProduced = null; + + public SchedulerManager(WorkerConfig workerConfig, + PulsarClient pulsarClient, + PulsarAdmin admin, + ErrorNotifier errorNotifier) { this.workerConfig = workerConfig; + this.pulsarClient = pulsarClient; this.admin = admin; this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader()); - - this.producer = createProducer(pulsarClient, workerConfig); - this.executorService = executor; + this.errorNotifier = errorNotifier; - scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec()); } private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) { @@ -136,26 +167,58 @@ public class SchedulerManager implements AutoCloseable { return producer.get(); } + public synchronized void initialize() { + if (!isRunning) { + log.info("Initializing scheduler manager"); + executorService = new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(5)); + executorService.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("worker-scheduler-%d").build()); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-assignment-topic-compactor")); + scheduleCompaction(this.scheduledExecutorService, workerConfig.getTopicCompactionFrequencySec()); + + producer = createProducer(pulsarClient, workerConfig); + isRunning = true; + lastMessageProduced = null; + } + } + public Future<?> schedule() { - return executorService.submit(() -> { - synchronized (SchedulerManager.this) { - boolean isLeader = membershipManager.isLeader(); - if (isLeader) { - try { - invokeScheduler(); - } catch (Exception e) { - log.warn("Failed to invoke scheduler", e); - throw e; + if (!leaderService.isLeader()) { + return CompletableFuture.completedFuture(null); + } + + // make sure we are initialized before scheduling + initialize(); + + try { + return executorService.submit(() -> { + try { + schedulerLock.lock(); + + boolean isLeader = leaderService.isLeader(); + if (isLeader) { + try { + invokeScheduler(); + } catch (Throwable th) { + log.error("Encountered error when invoking scheduler", th); + errorNotifier.triggerError(th); + } } + } finally { + schedulerLock.unlock(); } - } - }); + }); + } catch (RejectedExecutionException e) { + // task queue is full so just ignore + log.debug("Rejected task to invoke scheduler since task queue is already full"); + return CompletableFuture.completedFuture(null); + } } private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) { if (executor != null) { executor.scheduleWithFixedDelay(() -> { - if (membershipManager.isLeader() && isCompactionNeeded.get()) { + if (leaderService.isLeader() && isCompactionNeeded.get()) { compactAssignmentTopic(); isCompactionNeeded.set(false); } @@ -164,20 +227,21 @@ public class SchedulerManager implements AutoCloseable { } @VisibleForTesting - public void invokeScheduler() { + void invokeScheduler() { - Set<String> currentMembership = this.membershipManager.getCurrentMembership() + Set<String> currentMembership = membershipManager.getCurrentMembership() .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet()); - List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData(); + List<FunctionMetaData> allFunctions = functionMetaDataManager.getAllFunctionMetaData(); Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged()); - Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager + Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager .getCurrentAssignments(); //delete assignments of functions and instances that don't exist anymore Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next(); + String workerId = workerIdToAssignmentEntry.getKey(); Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue(); // remove instances that don't exist anymore @@ -185,7 +249,14 @@ public class SchedulerManager implements AutoCloseable { String fullyQualifiedInstanceId = entry.getKey(); boolean deleted = !allInstances.containsKey(fullyQualifiedInstanceId); if (deleted) { - publishNewAssignment(entry.getValue().toBuilder().build(), true); + Assignment assignment = entry.getValue(); + MessageId messageId = publishNewAssignment(assignment.toBuilder().build(), true); + + // Directly update in memory assignment cache since I am leader + log.info("Deleting assignment: {}", assignment); + functionRuntimeManager.deleteAssignment(fullyQualifiedInstanceId); + // update message id associated with current view of assignments map + lastMessageProduced = messageId; } return deleted; }); @@ -198,11 +269,18 @@ public class SchedulerManager implements AutoCloseable { if (!assignment.getInstance().equals(instance)) { functionMap.put(fullyQualifiedInstanceId, assignment.toBuilder().setInstance(instance).build()); - publishNewAssignment(assignment.toBuilder().setInstance(instance).build().toBuilder().build(), false); + Assignment newAssignment = assignment.toBuilder().setInstance(instance).build().toBuilder().build(); + MessageId messageId = publishNewAssignment(newAssignment, false); + + // Directly update in memory assignment cache since I am leader + log.info("Updating assignment: {}", assignment); + functionRuntimeManager.processAssignment(newAssignment); + // update message id associated with current view of assignments map + lastMessageProduced = messageId; + } + if (functionMap.isEmpty()) { + it.remove(); } - } - if (functionMap.isEmpty()) { - it.remove(); } } @@ -222,10 +300,10 @@ public class SchedulerManager implements AutoCloseable { .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()) .collect(Collectors.toList()); - Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments, - allInstances); + Pair<List<Function.Instance>, List<Assignment>> unassignedInstances + = getUnassignedFunctionInstances(workerIdToAssignments, allInstances); - List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership); + List<Assignment> assignments = scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership); assignments.addAll(unassignedInstances.getRight()); if (log.isDebugEnabled()) { @@ -235,37 +313,43 @@ public class SchedulerManager implements AutoCloseable { isCompactionNeeded.set(!assignments.isEmpty()); for(Assignment assignment : assignments) { - publishNewAssignment(assignment, false); + MessageId messageId = publishNewAssignment(assignment, false); + + // Directly update in memory assignment cache since I am leader + log.info("Adding assignment: {}", assignment); + functionRuntimeManager.processAssignment(assignment); + // update message id associated with current view of assignments map + lastMessageProduced = messageId; } } - public void compactAssignmentTopic() { + private void compactAssignmentTopic() { if (this.admin != null) { try { this.admin.topics().triggerCompaction(workerConfig.getFunctionAssignmentTopic()); } catch (PulsarAdminException e) { log.error("Failed to trigger compaction", e); - executorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC, + scheduledExecutorService.schedule(() -> compactAssignmentTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC, TimeUnit.SECONDS); } } } - private void publishNewAssignment(Assignment assignment, boolean deleted) { + private MessageId publishNewAssignment(Assignment assignment, boolean deleted) { try { String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()); // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id // message - producer.newMessage().key(fullyQualifiedInstanceId) - .value(deleted ? "".getBytes() : assignment.toByteArray()).sendAsync().get(); + return producer.newMessage().key(fullyQualifiedInstanceId) + .value(deleted ? "".getBytes() : assignment.toByteArray()).send(); } catch (Exception e) { log.error("Failed to {} assignment update {}", assignment, deleted ? "send" : "deleted", e); throw new RuntimeException(e); } } - public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions, + private static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions, boolean externallyManagedRuntime) { Map<String, Function.Instance> functionInstances = new HashMap<>(); for (FunctionMetaData functionMetaData : allFunctions) { @@ -276,7 +360,7 @@ public class SchedulerManager implements AutoCloseable { return functionInstances; } - public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData, + static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData, boolean externallyManagedRuntime) { List<Function.Instance> functionInstances = new LinkedList<>(); if (!externallyManagedRuntime) { @@ -323,15 +407,35 @@ public class SchedulerManager implements AutoCloseable { } @Override - public void close() { + public synchronized void close() { + log.info("Closing scheduler manager"); try { - this.producer.close(); - } catch (PulsarClientException e) { - log.warn("Failed to shutdown scheduler manager assignment producer", e); + // make sure we are not closing while a scheduling is being calculated + schedulerLock.lock(); + + isRunning = false; + + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdown(); + } + + if (executorService != null) { + executorService.shutdown(); + } + + if (producer != null) { + try { + producer.close(); + } catch (PulsarClientException e) { + log.warn("Failed to shutdown scheduler manager assignment producer", e); + } + } + } finally { + schedulerLock.unlock(); } } - public static String checkHeartBeatFunction(Instance funInstance) { + static String checkHeartBeatFunction(Instance funInstance) { if (funInstance.getFunctionMetaData() != null && funInstance.getFunctionMetaData().getFunctionDetails() != null) { FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index e3ea170..532bc32 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -70,15 +70,15 @@ public class WorkerService { private PulsarAdmin brokerAdmin; private PulsarAdmin functionAdmin; private final MetricsGenerator metricsGenerator; - private final ScheduledExecutorService executor; @VisibleForTesting private URI dlogUri; + private LeaderService leaderService; + private FunctionAssignmentTailer functionAssignmentTailer; public WorkerService(WorkerConfig workerConfig) { this.workerConfig = workerConfig; this.statsUpdater = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-stats-updater")); - this.executor = Executors.newScheduledThreadPool(10, new DefaultThreadFactory("pulsar-worker")); this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig); } @@ -87,7 +87,7 @@ public class WorkerService { AuthenticationService authenticationService, AuthorizationService authorizationService, ErrorNotifier errorNotifier) throws InterruptedException { - log.info("Starting worker {}...", workerConfig.getWorkerId()); + log.info("/** Starting worker id={} **/", workerConfig.getWorkerId()); try { log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() @@ -98,13 +98,13 @@ public class WorkerService { try { // create the dlog namespace for storing function packages - this.dlogUri = dlogUri; + dlogUri = dlogUri; DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig); try { this.dlogNamespace = NamespaceBuilder.newBuilder() .conf(dlogConf) .clientId("function-worker-" + workerConfig.getWorkerId()) - .uri(this.dlogUri) + .uri(dlogUri) .build(); } catch (Exception e) { log.error("Failed to initialize dlog namespace {} for storing function packages", @@ -146,7 +146,7 @@ public class WorkerService { workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(), workerConfig.isTlsHostnameVerificationEnable()); - this.client = WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl(), + this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl(), workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), workerConfig.isUseTls(), pulsarClientTlsTrustCertsFilePath, @@ -156,16 +156,14 @@ public class WorkerService { this.functionAdmin = WorkerUtils.getPulsarAdminClient(functionWebServiceUrl); - this.client = WorkerUtils.getPulsarClient(this.workerConfig.getPulsarServiceUrl()); + this.client = WorkerUtils.getPulsarClient(workerConfig.getPulsarServiceUrl()); } - log.info("Created Pulsar client"); brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic()); brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic()); brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic()); //create scheduler manager - this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin, - this.executor); + this.schedulerManager = new SchedulerManager(workerConfig, client, brokerAdmin, errorNotifier); //create function meta data manager this.functionMetaDataManager = new FunctionMetaDataManager( @@ -179,52 +177,89 @@ public class WorkerService { if (!brokerAdmin.topics().getSubscriptions(coordinationTopic).contains(MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION)) { brokerAdmin.topics().createSubscription(coordinationTopic, MembershipManager.COORDINATION_TOPIC_SUBSCRIPTION, MessageId.earliest); } - this.membershipManager = new MembershipManager(this, this.client, this.brokerAdmin); + this.membershipManager = new MembershipManager(this, client, brokerAdmin); + // create function runtime manager this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, + workerConfig, this, - this.dlogNamespace, - this.membershipManager, + dlogNamespace, + membershipManager, connectorsManager, functionsManager, functionMetaDataManager, errorNotifier); - // Setting references to managers in scheduler - this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); - this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager); - this.schedulerManager.setMembershipManager(this.membershipManager); + + // initialize function assignment tailer that reads from the assignment topic + this.functionAssignmentTailer = new FunctionAssignmentTailer( + functionRuntimeManager, + client.newReader(), + workerConfig, + errorNotifier); // initialize function metadata manager - this.functionMetaDataManager.initialize(); + log.info("/** Initializing Metdata Manager **/"); + functionMetaDataManager.initialize(); // initialize function runtime manager - this.functionRuntimeManager.initialize(); + log.info("/** Initializing Runtime Manager **/"); + functionRuntimeManager.initialize(); + + this.leaderService = new LeaderService(this, + client, + functionAssignmentTailer, + schedulerManager, + errorNotifier); + + // Setting references to managers in scheduler + schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); + schedulerManager.setFunctionRuntimeManager(functionRuntimeManager); + schedulerManager.setMembershipManager(membershipManager); + schedulerManager.setLeaderService(leaderService); this.authenticationService = authenticationService; this.authorizationService = authorizationService; - // Starting cluster services - log.info("Start cluster services..."); - this.clusterServiceCoordinator = new ClusterServiceCoordinator( - this.workerConfig.getWorkerId(), - membershipManager); - - this.clusterServiceCoordinator.addTask("membership-monitor", - this.workerConfig.getFailureCheckFreqMs(), - () -> membershipManager.checkFailures( - functionMetaDataManager, functionRuntimeManager, schedulerManager)); + // Start function assignment tailer + log.info("/** Starting Function Assignment Tailer **/"); + functionAssignmentTailer.start(); - this.clusterServiceCoordinator.start(); + log.info("/** Start Leader Service **/"); + leaderService.start(); + + // start function metadata manager + log.info("/** Starting Metdata Manager **/"); + functionMetaDataManager.start(); - // Start function runtime manager - this.functionRuntimeManager.start(); + // Starting cluster services + this.clusterServiceCoordinator = new ClusterServiceCoordinator( + workerConfig.getWorkerId(), + leaderService); + + clusterServiceCoordinator.addTask("membership-monitor", + workerConfig.getFailureCheckFreqMs(), + () -> { + // computing a new schedule and checking for failures cannot happen concurrently + // both paths of code modify internally cached assignments map in function runtime manager + try { + schedulerManager.getSchedulerLock().lock(); + membershipManager.checkFailures( + functionMetaDataManager, functionRuntimeManager, schedulerManager); + } finally { + schedulerManager.getSchedulerLock().unlock(); + } + }); + + log.info("/** Starting Cluster Service Coordinator **/"); + clusterServiceCoordinator.start(); // indicate function worker service is done initializing this.isInitialized = true; + + log.info("/** Started worker id={} **/", workerConfig.getWorkerId()); } catch (Throwable t) { log.error("Error Starting up in worker", t); throw new RuntimeException(t); @@ -239,18 +274,20 @@ public class WorkerService { log.warn("Failed to close function metadata manager", e); } } - if (null != functionRuntimeManager) { + + if (null != functionAssignmentTailer) { try { - functionRuntimeManager.close(); + functionAssignmentTailer.close(); } catch (Exception e) { - log.warn("Failed to close function runtime manager", e); + log.warn("Failed to close function assignment tailer", e); } } - if (null != client) { + + if (null != functionRuntimeManager) { try { - client.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close pulsar client", e); + functionRuntimeManager.close(); + } catch (Exception e) { + log.warn("Failed to close function runtime manager", e); } } @@ -259,38 +296,46 @@ public class WorkerService { } if (null != membershipManager) { - try { - membershipManager.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close membership manager", e); - } + membershipManager.close(); } if (null != schedulerManager) { schedulerManager.close(); } - if (null != this.brokerAdmin) { - this.brokerAdmin.close(); + if (null != leaderService) { + try { + leaderService.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close leader service", e); + } + } + + if (null != client) { + try { + client.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close pulsar client", e); + } } - if (null != this.functionAdmin) { - this.functionAdmin.close(); + if (null != brokerAdmin) { + brokerAdmin.close(); } - if (null != this.stateStoreAdminClient) { - this.stateStoreAdminClient.close(); + if (null != functionAdmin) { + functionAdmin.close(); } - if (null != this.dlogNamespace) { - this.dlogNamespace.close(); + if (null != stateStoreAdminClient) { + stateStoreAdminClient.close(); } - if(this.executor != null) { - this.executor.shutdown(); + if (null != dlogNamespace) { + dlogNamespace.close(); } - if (this.statsUpdater != null) { + if (statsUpdater != null) { statsUpdater.shutdownNow(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index 5cad320..fe1f11d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -33,8 +33,11 @@ import org.apache.distributedlog.metadata.DLMetadata; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.functions.proto.Function; @@ -315,4 +318,17 @@ public final class WorkerUtils { return false; } + + public static Reader<byte[]> createReader(ReaderBuilder readerBuilder, + String readerName, + String topic, + MessageId startMessageId) throws PulsarClientException { + return readerBuilder + .subscriptionRolePrefix(readerName) + .readerName(readerName) + .topic(topic) + .readCompacted(true) + .startMessageId(startMessageId) + .create(); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java index 763465b..a9e33da 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinatorTest.java @@ -52,7 +52,7 @@ public class ClusterServiceCoordinatorTest { return new org.powermock.modules.testng.PowerMockObjectFactory(); } - private MembershipManager membershipManager; + private LeaderService leaderService; private ClusterServiceCoordinator coordinator; private ScheduledExecutorService mockExecutor; private MockExecutorController mockExecutorController; @@ -70,8 +70,8 @@ public class ClusterServiceCoordinatorTest { any(ThreadFactory.class)) ).thenReturn(mockExecutor); - this.membershipManager = mock(MembershipManager.class); - this.coordinator = new ClusterServiceCoordinator("test-coordinator", membershipManager); + this.leaderService = mock(LeaderService.class); + this.coordinator = new ClusterServiceCoordinator("test-coordinator", leaderService); } @@ -94,18 +94,18 @@ public class ClusterServiceCoordinatorTest { .scheduleAtFixedRate(any(Runnable.class), eq(interval), eq(interval), eq(TimeUnit.MILLISECONDS)); // when task is executed, it is the leader - when(membershipManager.isLeader()).thenReturn(true); + when(leaderService.isLeader()).thenReturn(true); mockExecutorController.advance(Duration.ofMillis(interval)); - verify(membershipManager, times(1)).isLeader(); + verify(leaderService, times(1)).isLeader(); verify(mockTask, times(1)).run(); // when task is executed, it is not the leader - when(membershipManager.isLeader()).thenReturn(false); + when(leaderService.isLeader()).thenReturn(false); mockExecutorController.advance(Duration.ofMillis(interval)); // `isLeader` is called twice, however the task is only executed once (when it was leader) - verify(membershipManager, times(2)).isLeader(); + verify(leaderService, times(2)).isLeader(); verify(mockTask, times(1)).run(); } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java new file mode 100644 index 0000000..28eb06a --- /dev/null +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; +import org.apache.pulsar.functions.utils.FunctionCommon; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +@Slf4j +public class FunctionAssignmentTailerTest { + + @Test(timeOut = 10000) + public void testErrorNotifier() throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2); + PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); + Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); + + Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); + + PulsarClient pulsarClient = mock(PulsarClient.class); + + Reader<byte[]> reader = mock(Reader.class); + + when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() { + @Override + public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable { + return messageList.poll(10, TimeUnit.SECONDS); + } + }); + + when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() { + @Override + public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable { + return new CompletableFuture<>(); + } + }); + + when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + return !messageList.isEmpty(); + } + }); + + ReaderBuilder readerBuilder = mock(ReaderBuilder.class); + doReturn(readerBuilder).when(pulsarClient).newReader(); + doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readerName(anyString()); + doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); + + doReturn(reader).when(readerBuilder).create(); + WorkerService workerService = mock(WorkerService.class); + doReturn(pulsarClient).when(workerService).getClient(); + doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + + ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl()); + + // test new assignment add functions + FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); + + FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier)); + + functionAssignmentTailer.start(); + + // verify no errors occured + verify(errorNotifier, times(0)).triggerError(any()); + + messageList.add(message1); + + verify(errorNotifier, times(0)).triggerError(any()); + + // trigger an error to be thrown + doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignmentMessage(any()); + + messageList.add(message2); + + try { + errorNotifier.waitForError(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), "test"); + } + verify(errorNotifier, times(1)).triggerError(any()); + + functionAssignmentTailer.close(); + } + + @Test(timeOut = 10000) + public void testProcessingAssignments() throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2); + + MessageId messageId1 = new MessageIdImpl(1, 1, -1); + MessageId messageId2 = new MessageIdImpl(1, 2, -1); + + PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); + Message message1 = spy(new MessageImpl("foo", messageId1.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); + + Message message2 = spy(new MessageImpl("foo", messageId2.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); + + PulsarClient pulsarClient = mock(PulsarClient.class); + + Reader<byte[]> reader = mock(Reader.class); + + when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() { + @Override + public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable { + return messageList.poll(10, TimeUnit.SECONDS); + } + }); + + when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() { + @Override + public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable { + return new CompletableFuture<>(); + } + }); + + when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + return !messageList.isEmpty(); + } + }); + + ReaderBuilder readerBuilder = mock(ReaderBuilder.class); + doReturn(readerBuilder).when(pulsarClient).newReader(); + doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readerName(anyString()); + doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); + + doReturn(reader).when(readerBuilder).create(); + WorkerService workerService = mock(WorkerService.class); + doReturn(pulsarClient).when(workerService).getClient(); + doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + + ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl()); + + // test new assignment add functions + FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); + + FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier)); + + functionAssignmentTailer.start(); + + messageList.add(message1); + for (int i = 0; i < 10; i++) { + try { + verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message1)); + break; + } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) { + if (i == 9) { + throw e; + } + } + Thread.sleep(200); + } + + messageList.add(message2); + for (int i = 0; i < 10; i++) { + try { + verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message2)); + break; + } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) { + if (i == 9) { + throw e; + } + } + Thread.sleep(200); + } + + Assert.assertEquals(functionAssignmentTailer.getLastMessageId(), message2.getMessageId()); + functionAssignmentTailer.close(); + } + + @Test(timeOut = 10000) + public void testTriggerReadToTheEndAndExit() throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); + + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2); + + MessageId messageId1 = new MessageIdImpl(1, 1, -1); + MessageId messageId2 = new MessageIdImpl(1, 2, -1); + + PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); + Message message1 = spy(new MessageImpl("foo", messageId1.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); + + Message message2 = spy(new MessageImpl("foo", messageId2.toString(), + new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder)); + doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); + + PulsarClient pulsarClient = mock(PulsarClient.class); + + Reader<byte[]> reader = mock(Reader.class); + + when(reader.readNext(anyInt(), any())).thenAnswer(new Answer<Message<byte[]>>() { + @Override + public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable { + return messageList.poll(10, TimeUnit.MILLISECONDS); + } + }); + + when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() { + @Override + public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable { + return new CompletableFuture<>(); + } + }); + + when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + return !messageList.isEmpty(); + } + }); + + ReaderBuilder readerBuilder = mock(ReaderBuilder.class); + doReturn(readerBuilder).when(pulsarClient).newReader(); + doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readerName(anyString()); + doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); + + doReturn(reader).when(readerBuilder).create(); + WorkerService workerService = mock(WorkerService.class); + doReturn(pulsarClient).when(workerService).getClient(); + doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + + ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl()); + + // test new assignment add functions + FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); + + FunctionAssignmentTailer functionAssignmentTailer = spy(new FunctionAssignmentTailer(functionRuntimeManager, readerBuilder, workerConfig, errorNotifier)); + + functionAssignmentTailer.start(); + + messageList.add(message1); + for (int i = 0; i < 10; i++) { + try { + verify(functionRuntimeManager, times(1)).processAssignmentMessage(eq(message1)); + break; + } catch (org.mockito.exceptions.verification.WantedButNotInvoked e) { + if (i == 9) { + throw e; + } + } + Thread.sleep(200); + } + + functionAssignmentTailer.triggerReadToTheEndAndExit().get(); + for (int i = 0; i < 10; i++) { + if(!functionAssignmentTailer.getThread().isAlive()) { + break; + } + + if (i == 9) { + Assert.assertFalse(functionAssignmentTailer.getThread().isAlive()); + } + Thread.sleep(200); + } + + messageList.add(message2); + Assert.assertEquals(functionAssignmentTailer.getLastMessageId(), message1.getMessageId()); + + functionAssignmentTailer.close(); + } +} diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index d486707..54b47f5 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -47,9 +47,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.any; @@ -57,7 +55,6 @@ import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; @@ -483,131 +480,6 @@ public class FunctionRuntimeManagerTest { assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), functionRuntimeInfo); } - @Test(timeOut = 10000) - public void testErrorNotifier() throws Exception { - WorkerConfig workerConfig = new WorkerConfig(); - workerConfig.setWorkerId("worker-1"); - workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); - workerConfig.setFunctionRuntimeFactoryConfigs( - ObjectMapperFactory.getThreadLocal().convertValue( - new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); - workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); - workerConfig.setStateStorageServiceUrl("foo"); - workerConfig.setFunctionAssignmentTopicName("assignments"); - - Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( - Function.FunctionDetails.newBuilder() - .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); - - Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder().setFunctionDetails( - Function.FunctionDetails.newBuilder() - .setTenant("test-tenant").setNamespace("test-namespace").setName("func-2")).build(); - - Function.Assignment assignment1 = Function.Assignment.newBuilder() - .setWorkerId("worker-1") - .setInstance(Function.Instance.newBuilder() - .setFunctionMetaData(function1).setInstanceId(0).build()) - .build(); - Function.Assignment assignment2 = Function.Assignment.newBuilder() - .setWorkerId("worker-1") - .setInstance(Function.Instance.newBuilder() - .setFunctionMetaData(function2).setInstanceId(0).build()) - .build(); - - ArrayBlockingQueue<Message<byte[]>> messageList = new ArrayBlockingQueue<>(2); - PulsarApi.MessageMetadata.Builder msgMetadataBuilder = PulsarApi.MessageMetadata.newBuilder(); - Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(), - new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null, msgMetadataBuilder)); - doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey(); - - Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(), - new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null, msgMetadataBuilder)); - doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey(); - - PulsarClient pulsarClient = mock(PulsarClient.class); - - Reader<byte[]> reader = mock(Reader.class); - - - when(reader.readNext()).thenAnswer(new Answer<Message<byte[]>>() { - @Override - public Message<byte[]> answer(InvocationOnMock invocationOnMock) throws Throwable { - return messageList.poll(10, TimeUnit.SECONDS); - } - }); - - when(reader.readNextAsync()).thenAnswer(new Answer<CompletableFuture<Message<byte[]>>>() { - @Override - public CompletableFuture<Message<byte[]>> answer(InvocationOnMock invocationOnMock) throws Throwable { - return new CompletableFuture<>(); - } - }); - - when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() { - @Override - public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { - return !messageList.isEmpty(); - } - }); - - ReaderBuilder readerBuilder = mock(ReaderBuilder.class); - doReturn(readerBuilder).when(pulsarClient).newReader(); - doReturn(readerBuilder).when(readerBuilder).topic(anyString()); - doReturn(readerBuilder).when(readerBuilder).readerName(anyString()); - doReturn(readerBuilder).when(readerBuilder).subscriptionRolePrefix(anyString()); - doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); - doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); - doReturn(readerBuilder).when(readerBuilder).readCompacted(anyBoolean()); - - doReturn(reader).when(readerBuilder).create(); - WorkerService workerService = mock(WorkerService.class); - doReturn(pulsarClient).when(workerService).getClient(); - doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); - - ErrorNotifier errorNotifier = spy(ErrorNotifier.getDefaultImpl()); - - // test new assignment add functions - FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( - workerConfig, - workerService, - mock(Namespace.class), - mock(MembershipManager.class), - mock(ConnectorsManager.class), - mock(FunctionsManager.class), - mock(FunctionMetaDataManager.class), - errorNotifier)); - FunctionActioner functionActioner = spy(functionRuntimeManager.getFunctionActioner()); - doNothing().when(functionActioner).startFunction(any(FunctionRuntimeInfo.class)); - doNothing().when(functionActioner).stopFunction(any(FunctionRuntimeInfo.class)); - doNothing().when(functionActioner).terminateFunction(any(FunctionRuntimeInfo.class)); - functionRuntimeManager.setFunctionActioner(functionActioner); - - functionRuntimeManager.initialize(); - - // verify no errors occured - verify(errorNotifier, times(0)).triggerError(any()); - - messageList.add(message1); - - functionRuntimeManager.start(); - - verify(errorNotifier, times(0)).triggerError(any()); - - // trigger an error to be thrown - doThrow(new RuntimeException("test")).when(functionRuntimeManager).processAssignment(any()); - - messageList.add(message2); - - try { - errorNotifier.waitForError(); - } catch (Exception e) { - assertEquals(e.getCause().getMessage(), "test"); - } - verify(errorNotifier, times(1)).triggerError(any()); - - functionRuntimeManager.close(); - } - @Test public void testRuntimeManagerInitialize() throws Exception { WorkerConfig workerConfig = new WorkerConfig(); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java new file mode 100644 index 0000000..445498a --- /dev/null +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerEventListener; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class LeaderServiceTest { + + private final WorkerConfig workerConfig; + private LeaderService leaderService; + private PulsarClientImpl mockClient; + AtomicReference<ConsumerEventListener> listenerHolder; + private ConsumerImpl mockConsumer; + private FunctionAssignmentTailer functionAssignmentTailer; + private SchedulerManager schedulerManager; + + public LeaderServiceTest() { + this.workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setWorkerPort(1234); + } + + @BeforeMethod + public void setup() throws PulsarClientException { + mockClient = mock(PulsarClientImpl.class); + + mockConsumer = mock(ConsumerImpl.class); + ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); + + when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.consumerName(anyString())).thenReturn(mockConsumerBuilder); + + when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); + WorkerService workerService = mock(WorkerService.class); + doReturn(workerConfig).when(workerService).getWorkerConfig(); + + listenerHolder = new AtomicReference<>(); + when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> { + + ConsumerEventListener listener = invocationOnMock.getArgument(0); + listenerHolder.set(listener); + + return mockConsumerBuilder; + }); + + when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); + + schedulerManager = mock(SchedulerManager.class); + + + functionAssignmentTailer = mock(FunctionAssignmentTailer.class); + when(functionAssignmentTailer.triggerReadToTheEndAndExit()).thenReturn(CompletableFuture.completedFuture(null)); + + leaderService = spy(new LeaderService(workerService, mockClient, functionAssignmentTailer, schedulerManager, ErrorNotifier.getDefaultImpl())); + leaderService.start(); + } + + @Test + public void testLeaderService() throws Exception { + MessageId messageId = new MessageIdImpl(1, 2, -1); + when(schedulerManager.getLastMessageProduced()).thenReturn(messageId); + + assertFalse(leaderService.isLeader()); + verify(mockClient, times(1)).newConsumer(); + + listenerHolder.get().becameActive(mockConsumer, 0); + assertTrue(leaderService.isLeader()); + + verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit(); + verify(functionAssignmentTailer, times(1)).close(); + verify(schedulerManager, times((1))).initialize(); + + listenerHolder.get().becameInactive(mockConsumer, 0); + assertFalse(leaderService.isLeader()); + + verify(functionAssignmentTailer, times(1)).startFromMessage(messageId); + verify(schedulerManager, times(1)).close(); + } + + @Test + public void testLeaderServiceNoNewScheduling() throws Exception { + when(schedulerManager.getLastMessageProduced()).thenReturn(null); + + assertFalse(leaderService.isLeader()); + verify(mockClient, times(1)).newConsumer(); + + listenerHolder.get().becameActive(mockConsumer, 0); + assertTrue(leaderService.isLeader()); + + verify(functionAssignmentTailer, times(1)).triggerReadToTheEndAndExit(); + verify(functionAssignmentTailer, times(1)).close(); + verify(schedulerManager, times((1))).initialize(); + + listenerHolder.get().becameInactive(mockConsumer, 0); + assertFalse(leaderService.isLeader()); + + verify(functionAssignmentTailer, times(1)).start(); + verify(schedulerManager, times(1)).close(); + } +} diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 7acb3f7..71fe6b2 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -28,14 +28,11 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -71,46 +68,6 @@ public class MembershipManagerTest { workerConfig.setStateStorageServiceUrl("foo"); } - @Test - public void testConsumerEventListener() throws Exception { - PulsarClientImpl mockClient = mock(PulsarClientImpl.class); - PulsarAdmin mockAdmin = mock(PulsarAdmin.class); - - ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class); - ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); - - when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); - - when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); - WorkerService workerService = mock(WorkerService.class); - doReturn(workerConfig).when(workerService).getWorkerConfig(); - - AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>(); - when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> { - - ConsumerEventListener listener = invocationOnMock.getArgument(0); - listenerHolder.set(listener); - - return mockConsumerBuilder; - }); - - when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); - - MembershipManager membershipManager = spy(new MembershipManager(workerService, mockClient, mockAdmin)); - assertFalse(membershipManager.isLeader()); - verify(mockClient, times(1)) - .newConsumer(); - - listenerHolder.get().becameActive(mockConsumer, 0); - assertTrue(membershipManager.isLeader()); - - listenerHolder.get().becameInactive(mockConsumer, 0); - assertFalse(membershipManager.isLeader()); - } - private static PulsarClient mockPulsarClient() throws PulsarClientException { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index fafed1f..e72b81e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.worker; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.InvalidProtocolBufferException; import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.CollectorRegistry; @@ -38,6 +39,7 @@ import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; +import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.mockito.invocation.Invocation; import org.testng.Assert; @@ -47,18 +49,29 @@ import org.testng.annotations.Test; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; @@ -68,6 +81,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -83,6 +97,8 @@ public class SchedulerManagerTest { private Producer producer; private TypedMessageBuilder<byte[]> message; private ScheduledExecutorService executor; + private LeaderService leaderService; + private ErrorNotifier errorNotifier; @BeforeMethod public void setup() { @@ -123,13 +139,16 @@ public class SchedulerManagerTest { this.executor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("worker-test")); - schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, executor)); + errorNotifier = spy(ErrorNotifier.getDefaultImpl()); + schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient, null, errorNotifier)); functionRuntimeManager = mock(FunctionRuntimeManager.class); functionMetaDataManager = mock(FunctionMetaDataManager.class); membershipManager = mock(MembershipManager.class); + leaderService = mock(LeaderService.class); schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); schedulerManager.setFunctionRuntimeManager(functionRuntimeManager); schedulerManager.setMembershipManager(membershipManager); + schedulerManager.setLeaderService(leaderService); } @AfterMethod @@ -171,16 +190,17 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am not leader - doReturn(false).when(membershipManager).isLeader(); + doReturn(false).when(leaderService).isLeader(); callSchedule(); verify(producer, times(0)).sendAsync(any()); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); List<Invocation> invocations = getMethodInvocationDetails(schedulerManager, - SchedulerManager.class.getMethod("invokeScheduler")); + SchedulerManager.class.getDeclaredMethod("invokeScheduler")); Assert.assertEquals(invocations.size(), 1); + verify(errorNotifier, times(0)).triggerError(any()); } @Test @@ -217,12 +237,13 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 0); + verify(errorNotifier, times(0)).triggerError(any()); } @Test @@ -264,11 +285,11 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 1); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -283,6 +304,8 @@ public class SchedulerManagerTest { .build(); Assert.assertEquals(assignment2, assignments); + // make sure we also directly added the assignment to in memory assignment cache in function runtime manager + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2)); } @Test @@ -334,20 +357,21 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 1); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Assignment assignments = Assignment.parseFrom(send); - - log.info("assignments: {}", assignments); + // delete assignment message should only have key = full qualified instance id and value = null; Assert.assertEquals(0, send.length); + + // make sure we also directly deleted the assignment from the in memory assignment cache in function runtime manager + verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()))); } @Test @@ -391,11 +415,11 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 1); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -443,7 +467,7 @@ public class SchedulerManagerTest { callSchedule(); - invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 4); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -502,18 +526,26 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 3); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; - Assignment assignments = Assignment.parseFrom(send); - log.info("assignments: {}", assignments); + for (int i = 0; i < invocations.size(); i++) { + Invocation invocation = invocations.get(i); + byte[] send = (byte[]) invocation.getRawArguments()[0]; + Assignment assignment = Assignment.parseFrom(send); + Assignment expectedAssignment = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(i).build()) + .build(); + Assert.assertEquals(assignment, expectedAssignment); + } Set<Assignment> allAssignments = Sets.newHashSet(); invocations.forEach(invocation -> { @@ -544,6 +576,12 @@ public class SchedulerManagerTest { assertTrue(allAssignments.contains(assignment2_2)); assertTrue(allAssignments.contains(assignment2_3)); + // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager + verify(functionRuntimeManager, times(3)).processAssignment(any()); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_1)); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_2)); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_3)); + // updating assignments currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1); currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2); @@ -568,7 +606,7 @@ public class SchedulerManagerTest { callSchedule(); - invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 6); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -583,6 +621,14 @@ public class SchedulerManagerTest { }); assertTrue(allAssignments2.contains(assignment2Scaled)); + + // make sure we also directly removed the assignment from the in memory assignment cache in function runtime manager + verify(functionRuntimeManager, times(2)).deleteAssignment(anyString()); + verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()))); + verify(functionRuntimeManager, times(1)).deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()))); + + verify(functionRuntimeManager, times(4)).processAssignment(any()); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Scaled)); } @Test @@ -621,11 +667,11 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 2); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -683,7 +729,7 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); @@ -703,7 +749,7 @@ public class SchedulerManagerTest { .setFunctionMetaData(function2).setInstanceId(2).build()) .build(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 3); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -722,6 +768,12 @@ public class SchedulerManagerTest { assertTrue(allAssignments.contains(assignment2_2)); assertTrue(allAssignments.contains(assignment2_3)); + // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager + verify(functionRuntimeManager, times(3)).processAssignment(any()); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_1)); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_2)); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2_3)); + // updating assignments currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1); currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2); @@ -757,7 +809,7 @@ public class SchedulerManagerTest { callSchedule(); - invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 6); invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); @@ -774,10 +826,16 @@ public class SchedulerManagerTest { assertTrue(allAssignments2.contains(assignment2Updated1)); assertTrue(allAssignments2.contains(assignment2Updated2)); assertTrue(allAssignments2.contains(assignment2Updated3)); + + // make sure we also directly updated the assignment to the in memory assignment cache in function runtime manager + verify(functionRuntimeManager, times(6)).processAssignment(any()); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated1)); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated2)); + verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated3)); } @Test - public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException, InvalidProtocolBufferException { + public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSuchMethodException, TimeoutException, ExecutionException { List<Function.FunctionMetaData> functionMetaDataList = new LinkedList<>(); long version = 5; Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() @@ -827,15 +885,15 @@ public class SchedulerManagerTest { doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); // i am leader - doReturn(true).when(membershipManager).isLeader(); + doReturn(true).when(leaderService).isLeader(); callSchedule(); - List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("sendAsync")); + List<Invocation> invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("send")); Assert.assertEquals(invocations.size(), 0); } - private void callSchedule() throws NoSuchMethodException, InterruptedException, + private void callSchedule() throws InterruptedException, TimeoutException, ExecutionException { Future<?> complete = schedulerManager.schedule();