This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 304685b Allow function rebalance to be run periodically (#7449) 304685b is described below commit 304685bfd20912ee058e15ecafcf34d39b470a47 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Jul 6 18:34:42 2020 -0700 Allow function rebalance to be run periodically (#7449) Co-authored-by: Jerry Peng <jer...@splunk.com> --- conf/functions_worker.yml | 2 ++ .../pulsar/functions/worker/WorkerConfig.java | 5 +++++ .../pulsar/functions/worker/SchedulerManager.java | 21 ++++++++++++++++++--- .../pulsar/functions/worker/WorkerService.java | 14 ++++++++++++++ .../functions/worker/rest/api/WorkerImpl.java | 7 ++++--- 5 files changed, 43 insertions(+), 6 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 7ab160e..1b0b9f8 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -93,6 +93,8 @@ schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinSche functionAssignmentTopicName: "assignments" failureCheckFreqMs: 30000 rescheduleTimeoutMs: 60000 +# frequency at which to check if cluster needs rebalancing (set to -1 to disable) +rebalanceCheckFreqSec: -1 initialBrokerReconnectMaxRetries: 60 assignmentWriteMaxRetries: 60 instanceLivenessCheckFreqMs: 30000 diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 4b0ddd1..631ae6f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -241,6 +241,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { ) private long rescheduleTimeoutMs; @FieldContext( + category = CATEGORY_FUNC_RUNTIME_MNG, + doc = "The frequency to check whether the cluster needs rebalancing" + ) + private long rebalanceCheckFreqSec; + @FieldContext( category = CATEGORY_FUNC_RUNTIME_MNG, doc = "The max number of retries for initial broker reconnects when function metadata manager" + " tries to create producer on metadata topics" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 4341310..374512a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -113,6 +113,8 @@ public class SchedulerManager implements AutoCloseable { private MessageId lastMessageProduced = null; private MessageId metadataTopicLastMessage = MessageId.earliest; + private Future<?> currentRebalanceFuture; + private AtomicBoolean rebalanceInProgess = new AtomicBoolean(false); public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, @@ -222,9 +224,18 @@ public class SchedulerManager implements AutoCloseable { return scheduleInternal(() -> invokeScheduler(), "Encountered error when invoking scheduler"); } - public Future<?> rebalance() { + private Future<?> rebalance() { return scheduleInternal(() -> invokeRebalance(), "Encountered error when invoking rebalance"); } + + public Future<?> rebalanceIfNotInprogress() { + if (rebalanceInProgess.compareAndSet(false, true)) { + currentRebalanceFuture = rebalance(); + return currentRebalanceFuture; + } else { + throw new RebalanceInProgressException(); + } + } @VisibleForTesting void invokeScheduler() { @@ -273,7 +284,7 @@ public class SchedulerManager implements AutoCloseable { MessageId messageId = publishNewAssignment(newAssignment, false); // Directly update in memory assignment cache since I am leader - log.info("Updating assignment: {}", assignment); + log.info("Updating assignment: {}", newAssignment); functionRuntimeManager.processAssignment(newAssignment); // update message id associated with current view of assignments map lastMessageProduced = messageId; @@ -358,7 +369,8 @@ public class SchedulerManager implements AutoCloseable { // update message id associated with current view of assignments map lastMessageProduced = messageId; } - log.info("Total number of new assignments computed for rebalance: {}", rebalancedAssignments.size()); + log.info("Rebalance - Total number of new assignments computed: {}", rebalancedAssignments.size()); + rebalanceInProgess.set(false); } private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) { @@ -511,4 +523,7 @@ public class SchedulerManager implements AutoCloseable { } return null; } + + public static class RebalanceInProgressException extends RuntimeException { + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 7754a82..5b493bc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -257,6 +257,20 @@ public class WorkerService { } }); + if (workerConfig.getRebalanceCheckFreqSec() > 0) { + clusterServiceCoordinator.addTask("rebalance-periodic-check", + workerConfig.getRebalanceCheckFreqSec() * 1000, + () -> { + try { + schedulerManager.rebalanceIfNotInprogress().get(); + } catch (SchedulerManager.RebalanceInProgressException e) { + log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring."); + } catch (Exception e) { + log.warn("Encountered error when running scheduled rebalance", e); + } + }); + } + log.info("/** Starting Cluster Service Coordinator **/"); clusterServiceCoordinator.start(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index f58f57b..9e24f38 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -29,6 +29,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.MembershipManager; +import org.apache.pulsar.functions.worker.SchedulerManager; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.WorkerUtils; @@ -217,9 +218,9 @@ public class WorkerImpl { } if (worker().getLeaderService().isLeader()) { - if (currentRebalanceFuture == null || currentRebalanceFuture.isDone()) { - currentRebalanceFuture = this.worker().getSchedulerManager().rebalance(); - } else { + try { + worker().getSchedulerManager().rebalanceIfNotInprogress(); + } catch (SchedulerManager.RebalanceInProgressException e) { throw new RestException(Status.BAD_REQUEST, "Rebalance already in progress"); } } else {