This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c51346fa3f5 [improve][pip] PIP-453: Improve the metadata store
threading model (#25173)
c51346fa3f5 is described below
commit c51346fa3f5ec9cdd04ad03ba5d6b05b6c9a4f35
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jan 26 20:01:44 2026 +0800
[improve][pip] PIP-453: Improve the metadata store threading model (#25173)
---
pip/pip-453.md | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 98 insertions(+)
diff --git a/pip/pip-453.md b/pip/pip-453.md
new file mode 100644
index 00000000000..a42736b9dda
--- /dev/null
+++ b/pip/pip-453.md
@@ -0,0 +1,98 @@
+# PIP-453: Improve the metadata store threading model
+
+# Background knowledge
+
+The `pulsar-metadata` module provides two abstractions for interacting with
metadata stores:
+- `MetadataStore`: the wrapper on the actual underlying metadata store (e.g.
ZooKeeper), which has caches for value and children of a given key.
+- `MetadataCache<T>`: a typed cache layer on top of `MetadataStore`, which
performs serialization and deserialization of data between `T` and `byte[]`.
+
+The `MetadataStore` instance is unique in each broker, and is shared by
multiple `MetadataCache<T>` instances.
+
+However, a single thread whose name starts with the metadata store name (e.g.
`ZK-MetadataStore`) is used by implementations of them. This thread is used in
the following tasks:
+1. Executing callbacks of APIs like `put`.
+2. Executing notification handlers, including `AbstractMetadataStore#accept`,
which calls `accept` methods of all `MetadataCache` instances and all listeners
registered by `MetadataStore#registerListener`.
+3. For ZooKeeper and Etcd, which support batching requests, it's used to
schedule flushing tasks at a fixed rate, which is determined by the
`metadataStoreBatchingMaxDelayMillis` config (default: 5 ms).
+4. Scheduling some other tasks, e.g. retrying failed operations.
+
+It should be noted that `MetadataCache` executes the compute sensitive tasks
like serialization in the `MetadataStore` callback. When the number of metadata
operations grows, this thread is easy to be overwhelmed. It also affects the
topic loading, which involves many metadata operations, this thread can be
overwhelmed and block other tasks. For example, in a production environment,
it's observed that the `pulsar_batch_metadata_store_queue_wait_time` metric is
high (100 ms), which should [...]
+
+# Motivation
+
+The single thread model is inefficient when there are many metadata
operations. For example, when a broker is down and the topics owned by this
broker will be transferred to the new owner broker. Since the new owner broker
might never owned them before, even the `MetadataCache` caches are cold, which
results in many metadata operations. However, the CPU-bound tasks like
serialization and deserialization are executed in the `MetadataStore` thread,
which makes it easy to be overwhelmed. Th [...]
+
+In a production environment, there is a case when the metadata operation rate
increased suddenly, the `pulsar_batch_metadata_store_queue_wait_time_ms_bucket`
metric increased to ~100 ms, which is a part of the total latency of a metadata
operation. As a result, the total P99 get latency
(`pulsar_metadata_store_ops_latency_ms_bucket{type="get"}`) increased to 2
seconds.
+
+The 3rd task in the previous section is scheduled via `scheduleAtFixedRate`,
which means if the task is not executed in time (5 ms by default), the task
will be executed immediately again in a short time, which also burdens the
single metadata store thread.
+
+# Goals
+
+## In Scope
+
+Improve the existing thread model to handle various tasks on metadata store,
which could avoid a single thread being overwhelmed when there are many
metadata operations.
+
+## Out of Scope
+
+Actually the batching mechanism introduced by
[#13043](https://github.com/apache/pulsar/pull/13043) is harmful. The `flush`
method, which is responsible to send a batch of metadata operations to broker,
is called in the metadata store thread rather than the caller thread. The
trade-off of the higher throughput is the lower latency. The benefit is limited
because in most time the metadata operation rate is not so high. See this [test
report](https://github.com/BewareMyPower/zookeeper-benc [...]
+
+This proposal doesn't intend to change the existing batching mechanism or
disable it by default. It only improves the threading model to avoid the single
thread being overwhelmed.
+
+Additionally, some code paths execute the compute intensive tasks in the
metadata store thread directly (e.g. `store.get(path).thenApply(/* ... */)`),
this proposal does not aim at changing them to asynchronous methods (e.g.
`thenApplyAsync`).
+
+# High Level Design
+
+Create 3 set of threads:
+- `<name>-event`: the original metadata store thread, which is now only
responsible to handle notifications. This executor won't be a
`ScheduledExecutorService` anymore.
+- `<name>-batch-flusher`: a single thread, which is used to schedule the
flushing task at a fixed rate. It won't be created if
`metadataStoreBatchingEnabled` is false.
+- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances
to execute compute intensive tasks like serialization and deserialization. The
same path will be handled by the same thread to keep the processing order on
the same path.
+
+Regarding the callbacks, don't switch to a different thread. This change is
not breaking because the underlying metadata store usually executes the
callback in a single thread (e.g. `<name>-EventThread` in ZooKeeper) like the
single thread in the current implementation. The caller should be responsible
to manage worker threads on the metadata operation result if the callback is
compute intensive.
+
+The only concern is that introducing a new thread to execute callbacks allows
waiting for the future of metadata store APIs in the callback. After this
change, the following use case could be a dead lock:
+
+```java
+metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
+```
+
+Other tasks like the retry on failure is executed in JVM's common
`ForkJoinPool` by `CompletableFuture` APIs. For example:
+
+```diff
+---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
++++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+@@ -245,9 +245,8 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
+ countsByType, totalSize, opsForLog);
+
+ // Retry with the individual operations
+- executor.schedule(() -> {
+- ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
+- }, 100, TimeUnit.MILLISECONDS);
++ CompletableFuture.delayedExecutor(100,
TimeUnit.MILLISECONDS).execute(() ->
++ ops.forEach(o ->
batchOperation(Collections.singletonList(o))));
+ } else {
+ MetadataStoreException e = getException(code, path);
+ ops.forEach(o ->
o.getFuture().completeExceptionally(e));
+```
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Configuration
+
+Add a configurations to specify the number of worker threads for
`MetadataCache`:
+
+```java
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The number of threads uses for serializing and
deserializing data to and from the metadata store"
+ )
+ private int metadataStoreSerDesThreads =
Runtime.getRuntime().availableProcessors();
+```
+
+### Metrics
+
+The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed
because the `<name>-batch-flusher` thread won't execute other tasks except for
flushing.
+
+# Links
+
+* Mailing List discussion thread:
https://lists.apache.org/thread/0cfdyvj96gw1sp1mo2zghl0lmsms5w1d
+* Mailing List voting thread:
https://lists.apache.org/thread/cktj2k8myw076yggn63k8yxs5357yd61