This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 56cd23c301d Revert "[improve][meta] PIP-453: Improve the metadata
store threading model (#25187)"
56cd23c301d is described below
commit 56cd23c301dd1a9ae6249c51439da5e142037185
Author: coderzc <[email protected]>
AuthorDate: Fri Feb 13 10:49:58 2026 +0800
Revert "[improve][meta] PIP-453: Improve the metadata store threading model
(#25187)"
This reverts commit 786864c04d772f36f6da9363406173af6c5ad1f2.
---
conf/broker.conf | 2 -
conf/standalone.conf | 3 -
pip/pip-453.md | 82 ---------
.../IsolatedBookieEnsemblePlacementPolicy.java | 3 -
.../apache/pulsar/broker/ServiceConfiguration.java | 6 -
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 34 ++--
.../org/apache/pulsar/broker/PulsarService.java | 2 -
.../apache/pulsar/broker/PulsarServiceTest.java | 65 --------
.../stats/OpenTelemetryMetadataStoreStatsTest.java | 12 ++
.../pulsar/metadata/api/MetadataStoreConfig.java | 3 -
.../metadata/cache/impl/MetadataCacheImpl.java | 185 +++++++++------------
.../metadata/impl/AbstractMetadataStore.java | 77 ++++-----
.../pulsar/metadata/impl/EtcdMetadataStore.java | 19 ++-
.../metadata/impl/LocalMemoryMetadataStore.java | 2 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 19 ++-
.../batching/AbstractBatchedMetadataStore.java | 39 ++---
.../metadata/impl/oxia/OxiaMetadataStore.java | 4 +-
.../impl/stats/BatchMetadataStoreStats.java | 43 ++++-
.../impl/MetadataStoreFactoryImplTest.java | 2 +-
20 files changed, 211 insertions(+), 393 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index b2e74569f14..8b6ec12d7f2 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -954,8 +954,6 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128
-# The number of threads used for serializing and deserializing data to and
from the metadata store
-metadataStoreSerDesThreads=1
### --- Authentication --- ###
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 571cc0fbbe8..e5adf9a9637 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -430,9 +430,6 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128
-# The number of threads used for serializing and deserializing data to and
from the metadata store
-metadataStoreSerDesThreads=1
-
### --- TLS --- ###
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false
diff --git a/pip/pip-453.md b/pip/pip-453.md
deleted file mode 100644
index f9109798ba7..00000000000
--- a/pip/pip-453.md
+++ /dev/null
@@ -1,82 +0,0 @@
-# PIP-453: Improve the metadata store threading model
-
-# Background knowledge
-
-The `pulsar-metadata` module provides two abstractions for interacting with
metadata stores:
-- `MetadataStore`: the wrapper on the actual underlying metadata store (e.g.
ZooKeeper), which has caches for value and children of a given key.
-- `MetadataCache<T>`: a typed cache layer on top of `MetadataStore`, which
performs serialization and deserialization of data between `T` and `byte[]`.
-
-The `MetadataStore` instance is unique in each broker, and is shared by
multiple `MetadataCache<T>` instances.
-
-However, a single thread whose name starts with the metadata store name (e.g.
`ZK-MetadataStore`) is used by implementations of them. This thread is used in
the following tasks:
-1. Executing callbacks of APIs like `put`.
-2. Executing notification handlers, including `AbstractMetadataStore#accept`,
which calls `accept` methods of all `MetadataCache` instances and all listeners
registered by `MetadataStore#registerListener`.
-3. For ZooKeeper and Etcd, which support batching requests, it's used to
schedule flushing tasks at a fixed rate, which is determined by the
`metadataStoreBatchingMaxDelayMillis` config (default: 5 ms).
-4. Scheduling some other tasks, e.g. retrying failed operations.
-
-It should be noted that `MetadataCache` executes the compute sensitive tasks
like serialization in the `MetadataStore` callback. When the number of metadata
operations grows, this thread is easy to be overwhelmed. It also affects the
topic loading, which involves many metadata operations, this thread can be
overwhelmed and block other tasks. For example, in a production environment,
it's observed that the `pulsar_batch_metadata_store_queue_wait_time` metric is
high (100 ms), which should [...]
-
-# Motivation
-
-The single thread model is inefficient when there are many metadata
operations. For example, when a broker is down and the topics owned by this
broker will be transferred to the new owner broker. Since the new owner broker
might never owned them before, even the `MetadataCache` caches are cold, which
results in many metadata operations. However, the CPU-bound tasks like
serialization and deserialization are executed in the `MetadataStore` thread,
which makes it easy to be overwhelmed. Th [...]
-
-In a production environment, there is a case when the metadata operation rate
increased suddenly, the `pulsar_batch_metadata_store_queue_wait_time_ms_bucket`
metric increased to ~100 ms, which is a part of the total latency of a metadata
operation. As a result, the total P99 get latency
(`pulsar_metadata_store_ops_latency_ms_bucket{type="get"}`) increased to 2
seconds.
-
-The 3rd task in the previous section is scheduled via `scheduleAtFixedRate`,
which means if the task is not executed in time (5 ms by default), the task
will be executed immediately again in a short time, which also burdens the
single metadata store thread.
-
-# Goals
-
-## In Scope
-
-Improve the existing thread model to handle various tasks on metadata store,
which could avoid a single thread being overwhelmed when there are many
metadata operations.
-
-## Out of Scope
-
-Actually the batching mechanism introduced by
[#13043](https://github.com/apache/pulsar/pull/13043) is harmful. The `flush`
method, which is responsible to send a batch of metadata operations to broker,
is called in the metadata store thread rather than the caller thread. The
trade-off of the higher throughput is the lower latency. The benefit is limited
because in most time the metadata operation rate is not so high. See this [test
report](https://github.com/BewareMyPower/zookeeper-benc [...]
-
-This proposal doesn't intend to change the existing batching mechanism or
disable it by default. It only improves the threading model to avoid the single
thread being overwhelmed.
-
-Additionally, some code paths execute the compute intensive tasks in the
metadata store thread directly (e.g. `store.get(path).thenApply(/* ... */)`),
this proposal does not aim at changing them to asynchronous methods (e.g.
`thenApplyAsync`).
-
-# High Level Design
-
-Create 4 sets of threads:
-- `<name>-event`: the original metadata store thread, which is now only
responsible to handle notifications. This executor won't be a
`ScheduledExecutorService` anymore.
-- `<name>-scheduler`: a single thread, which is used to schedule tasks like
flushing and retrying failed operations.
-- `<name>-batch-flusher`: a single thread, which is used to schedule the
flushing task at a fixed rate. It won't be created if
`metadataStoreBatchingEnabled` is false.
-- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances
to execute compute intensive tasks like serialization and deserialization. The
same path will be handled by the same thread to keep the processing order on
the same path.
-
-Regarding the callbacks, don't switch to a different thread. This change is
not breaking because the underlying metadata store usually executes the
callback in a single thread (e.g. `<name>-EventThread` in ZooKeeper) like the
single thread in the current implementation. The caller should be responsible
to manage worker threads on the metadata operation result if the callback is
compute intensive.
-
-The only concern is that introducing a new thread to execute callbacks allows
waiting for the future of metadata store APIs in the callback. After this
change, the following use case could be a dead lock:
-
-```java
-metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
-```
-
-# Detailed Design
-
-## Public-facing Changes
-
-### Configuration
-
-Add a configurations to specify the number of worker threads for
`MetadataCache`:
-
-```java
- @FieldContext(
- category = CATEGORY_SERVER,
- doc = "The number of threads uses for serializing and
deserializing data to and from the metadata store"
- )
- private int metadataStoreSerDesThreads = 1;
-```
-
-Use 1 as the default value since the serialization and deserialization tasks
are not frequent. This separated thread pool is mainly added to avoid blocking
the metadata store callback thread.
-
-### Metrics
-
-The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed
because the `<name>-batch-flusher` thread won't execute other tasks except for
flushing.
-
-# Links
-
-* Mailing List discussion thread:
https://lists.apache.org/thread/0cfdyvj96gw1sp1mo2zghl0lmsms5w1d
-* Mailing List voting thread:
https://lists.apache.org/thread/cktj2k8myw076yggn63k8yxs5357yd61
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 4ef1c594be4..878bbc4d654 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -58,8 +57,6 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
// the secondary group.
private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
- @Getter
- @VisibleForTesting
private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 93670bb2377..44b62709fab 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -498,12 +498,6 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean metadataStoreAllowReadOnlyOperations;
- @FieldContext(
- category = CATEGORY_SERVER,
- doc = "The number of threads used for serializing and
deserializing data to and from the metadata store"
- )
- private int metadataStoreSerDesThreads = 1;
-
@Deprecated
@FieldContext(
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 936b04386ff..68f92ab416d 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -22,15 +22,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -291,7 +288,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
secondaryBookieGroup.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
bookieMapping.put("group2", secondaryBookieGroup);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
null).getResult();
@@ -342,7 +340,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
+ "\": {\"rack\": \"rack0\", \"hostname\":
\"bookie3.example.com\"}, \"" + BOOKIE4
+ "\": {\"rack\": \"rack2\", \"hostname\":
\"bookie4.example.com\"}}}";
- updateBookieInfo(isolationPolicy,
data.getBytes(StandardCharsets.UTF_8));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(StandardCharsets.UTF_8),
+ Optional.empty()).join();
List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(),
new HashSet<>()).getResult();
@@ -400,7 +399,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
new HashSet<>()).getResult();
@@ -784,7 +784,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup2, group2);
bookieMapping.put(isolationGroup3, group3);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
groups.setRight(Sets.newHashSet(""));
@@ -807,7 +808,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -829,24 +831,12 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
assertTrue(blacklist.isEmpty());
}
-
- // The policy gets the bookie info asynchronously before each query or
update, when putting the bookie info into
- // the metadata store, the cache needs some time to receive the
notification and update accordingly.
- private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy
isolationPolicy, byte[] bookieInfo) {
- final var cache = isolationPolicy.getBookieMappingCache();
- assertNotNull(cache); // the policy must have been initialized
-
- final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
- final var previousBookieInfo = cache.getIfCached(key);
- store.put(key, bookieInfo, Optional.empty()).join();
- Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
- assertNotEquals(cache.getIfCached(key), previousBookieInfo));
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index eaebd7fa58c..e61fbfac566 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -434,7 +434,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.synchronizer(synchronizer)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
-
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}
@@ -1306,7 +1305,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
-
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 6195e9cdae5..6c04889d8f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -25,15 +25,10 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -43,10 +38,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.metadata.api.MetadataCacheConfig;
-import org.apache.pulsar.metadata.api.MetadataSerde;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.Stat;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -348,60 +339,4 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
assertTrue(e instanceof PulsarClientException.TimeoutException);
}
}
-
- @Test
- public void testMetadataSerDesThreads() throws Exception {
- final var numSerDesThreads = 5;
- final var config = new ServiceConfiguration();
- config.setMetadataStoreSerDesThreads(numSerDesThreads);
- config.setClusterName("test");
- config.setMetadataStoreUrl("memory:local");
- config.setConfigurationMetadataStoreUrl("memory:local");
-
- @Cleanup final var pulsar = new PulsarService(config);
- pulsar.start();
-
- BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
- final var serDes = new CustomMetadataSerDes();
- final var cache = store.getMetadataCache(prefix, serDes,
MetadataCacheConfig.builder().build());
- for (int i = 0; i < 100 &&
serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
- cache.create(prefix + i, "value-" + i).join();
- final var value = cache.get(prefix + i).join();
- assertEquals(value.orElseThrow(), "value-" + i);
- final var newValue = cache.readModifyUpdate(prefix + i, s -> s
+ "-updated").join();
- assertEquals(newValue, "value-" + i + "-updated");
- // Verify the serialization and deserialization are handled by
the same thread
- assertEquals(serDes.threadNameToSerializedPaths,
serDes.threadNameToDeserializedPaths);
- }
- log.info("SerDes thread mapping: {}",
serDes.threadNameToSerializedPaths);
- assertEquals(serDes.threadNameToSerializedPaths.keySet().size(),
numSerDesThreads);
- // Verify a path cannot be handled by multiple threads
- final var paths =
serDes.threadNameToSerializedPaths.values().stream()
- .flatMap(Set::stream).sorted().toList();
- assertEquals(paths.stream().distinct().toList(), paths);
- };
-
- verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
- verifier.accept(pulsar.getConfigurationMetadataStore(),
"/test-config/");
- }
-
- private static class CustomMetadataSerDes implements MetadataSerde<String>
{
-
- final Map<String, Set<String>> threadNameToSerializedPaths = new
ConcurrentHashMap<>();
- final Map<String, Set<String>> threadNameToDeserializedPaths = new
ConcurrentHashMap<>();
-
- @Override
- public byte[] serialize(String path, String value) throws IOException{
-
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
- __ -> ConcurrentHashMap.newKeySet()).add(path);
- return value.getBytes();
- }
-
- @Override
- public String deserialize(String path, byte[] data, Stat stat) throws
IOException {
-
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
- __ -> ConcurrentHashMap.newKeySet()).add(path);
- return new String(data);
- }
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
index 390aa1e49e2..9e8bde20b88 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.ExecutorService;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -28,6 +29,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -51,6 +53,14 @@ public class OpenTelemetryMetadataStoreStatsTest extends
BrokerTestBase {
var newStats = new MetadataStoreStats(
localMetadataStoreName,
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "metadataStoreStats",
newStats, true);
+
+ var currentBatchedStats = (BatchMetadataStoreStats)
FieldUtils.readField(localMetadataStore,
+ "batchMetadataStoreStats", true);
+ currentBatchedStats.close();
+ var currentExecutor = (ExecutorService)
FieldUtils.readField(currentBatchedStats, "executor", true);
+ var newBatchedStats = new
BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
+
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+ FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats",
newBatchedStats, true);
}
@AfterMethod(alwaysRun = true)
@@ -79,5 +89,7 @@ public class OpenTelemetryMetadataStoreStatsTest extends
BrokerTestBase {
var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics,
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
attributes, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics,
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
+ value -> assertThat(value).isPositive());
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index fcde0dce840..ef50dc87691 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -104,7 +104,4 @@ public class MetadataStoreConfig {
* The estimator to estimate the payload length of metadata node, which
used to limit the batch size requested.
*/
private MetadataNodeSizeStats nodeSizeStats;
-
- @Builder.Default
- private final int numSerDesThreads = 1;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index ca165f0464e..b1f0572547c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,10 +39,10 @@ import java.util.function.Supplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -62,26 +62,23 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
private final MetadataStore store;
private final MetadataStoreExtended storeExtended;
private final MetadataSerde<T> serde;
- private final OrderedExecutor executor;
- private final ScheduledExecutorService schedulerExecutor;
+ private final ScheduledExecutorService executor;
private final MetadataCacheConfig<T> cacheConfig;
private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>>
objCache;
public MetadataCacheImpl(String cacheName, MetadataStore store,
TypeReference<T> typeRef,
- MetadataCacheConfig<T> cacheConfig,
OrderedExecutor executor,
- ScheduledExecutorService schedulerExecutor) {
- this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef),
cacheConfig, executor, schedulerExecutor);
+ MetadataCacheConfig<T> cacheConfig,
ScheduledExecutorService executor) {
+ this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef),
cacheConfig, executor);
}
public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType
type, MetadataCacheConfig<T> cacheConfig,
- OrderedExecutor executor,
ScheduledExecutorService schedulerExecutor) {
- this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type),
cacheConfig, executor, schedulerExecutor);
+ ScheduledExecutorService executor) {
+ this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type),
cacheConfig, executor);
}
public MetadataCacheImpl(String cacheName, MetadataStore store,
MetadataSerde<T> serde,
- MetadataCacheConfig<T> cacheConfig,
OrderedExecutor executor,
- ScheduledExecutorService schedulerExecutor) {
+ MetadataCacheConfig<T> cacheConfig,
ScheduledExecutorService executor) {
this.store = store;
if (store instanceof MetadataStoreExtended) {
this.storeExtended = (MetadataStoreExtended) store;
@@ -91,7 +88,6 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
this.serde = serde;
this.cacheConfig = cacheConfig;
this.executor = executor;
- this.schedulerExecutor = schedulerExecutor;
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -105,9 +101,6 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
.buildAsync(new AsyncCacheLoader<String,
Optional<CacheGetResult<T>>>() {
@Override
public CompletableFuture<Optional<CacheGetResult<T>>>
asyncLoad(String key, Executor executor) {
- if (log.isDebugEnabled()) {
- log.debug("Loading key {} into metadata cache {}",
key, cacheName);
- }
return readValueFromStore(key);
}
@@ -117,16 +110,12 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
Optional<CacheGetResult<T>> oldValue,
Executor executor) {
if (store instanceof AbstractMetadataStore &&
((AbstractMetadataStore) store).isConnected()) {
- if (log.isDebugEnabled()) {
- log.debug("Reloading key {} into metadata
cache {}", key, cacheName);
- }
- final var future = readValueFromStore(key);
- future.thenAccept(val -> {
+ return readValueFromStore(key).thenApply(val -> {
if (cacheConfig.getAsyncReloadConsumer() !=
null) {
cacheConfig.getAsyncReloadConsumer().accept(key, val);
}
+ return val;
});
- return future;
} else {
// Do not try to refresh the cache item if we know
that we're not connected to the
// metadata store
@@ -139,46 +128,22 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
private CompletableFuture<Optional<CacheGetResult<T>>>
readValueFromStore(String path) {
- final var future = new
CompletableFuture<Optional<CacheGetResult<T>>>();
- store.get(path).thenComposeAsync(optRes -> {
- // There could be multiple pending reads for the same path, for
example, when a path is created,
- // 1. The `accept` method will call `refresh`
- // 2. The `put` method will call `refresh` after the metadata
store put operation is done
- // Both will call this method and the same result will be read. In
this case, we only need to deserialize
- // the value once.
- if (!optRes.isPresent()) {
- if (log.isDebugEnabled()) {
- log.debug("Key {} not found in metadata store", path);
- }
- return FutureUtils.value(Optional.<CacheGetResult<T>>empty());
- }
- final var res = optRes.get();
- final var cachedFuture = objCache.getIfPresent(path);
- if (cachedFuture != null && cachedFuture != future) {
- if (log.isDebugEnabled()) {
- log.debug("A new read on key {} is in progress or
completed, ignore this one", path);
- }
- return cachedFuture;
- }
- try {
- T obj = serde.deserialize(path, res.getValue(), res.getStat());
- if (log.isDebugEnabled()) {
- log.debug("Deserialized value for key {} (version: {}):
{}", path, res.getStat().getVersion(),
- obj);
- }
- return FutureUtils.value(Optional.of(new CacheGetResult<>(obj,
res.getStat())));
- } catch (Throwable t) {
- return FutureUtils.exception(new
ContentDeserializationException(
- "Failed to deserialize payload for key '" + path + "'",
t));
- }
- }, executor.chooseThread(path)).whenComplete((result, e) -> {
- if (e != null) {
- future.completeExceptionally(e.getCause());
- } else {
- future.complete(result);
- }
- });
- return future;
+ return store.get(path)
+ .thenCompose(optRes -> {
+ if (!optRes.isPresent()) {
+ return FutureUtils.value(Optional.empty());
+ }
+
+ try {
+ GetResult res = optRes.get();
+ T obj = serde.deserialize(path, res.getValue(),
res.getStat());
+ return FutureUtils
+ .value(Optional.of(new CacheGetResult<>(obj,
res.getStat())));
+ } catch (Throwable t) {
+ return FutureUtils.exception(new
ContentDeserializationException(
+ "Failed to deserialize payload for key '" +
path + "'", t));
+ }
+ });
}
@Override
@@ -204,9 +169,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
@Override
public CompletableFuture<T> readModifyUpdateOrCreate(String path,
Function<Optional<T>, T> modifyFunction) {
- final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
- .thenComposeAsync(optEntry -> {
+ .thenCompose(optEntry -> {
Optional<T> currentValue;
long expectedVersion;
@@ -238,14 +202,13 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
refresh(path);
}).thenApply(__ -> newValueObj);
- }, executor), path);
+ }), path);
}
@Override
public CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction) {
- final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
- .thenComposeAsync(optEntry -> {
+ .thenCompose(optEntry -> {
if (!optEntry.isPresent()) {
return FutureUtils.exception(new
NotFoundException(""));
}
@@ -268,57 +231,59 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
refresh(path);
}).thenApply(__ -> newValueObj);
- }, executor), path);
- }
-
- private CompletableFuture<byte[]> serialize(String path, T value) {
- final var future = new CompletableFuture<byte[]>();
- executor.executeOrdered(path, () -> {
- try {
- future.complete(serde.serialize(path, value));
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- });
- return future;
+ }), path);
}
@Override
public CompletableFuture<Void> create(String path, T value) {
- final var future = new CompletableFuture<Void>();
- serialize(path, value).thenCompose(content -> store.put(path, content,
Optional.of(-1L)))
- // Make sure we have the value cached before the operation is
completed
- // In addition to caching the value, we need to add a watch on the
path,
- // so when/if it changes on any other node, we are notified and we
can
- // update the cache
- .thenCompose(__ -> objCache.get(path))
- .whenComplete((__, ex) -> {
- if (ex == null) {
- future.complete(null);
- } else if (ex.getCause() instanceof BadVersionException) {
- // Use already exists exception to provide more
self-explanatory error message
- future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
- } else {
- future.completeExceptionally(ex.getCause());
- }
- });
+ byte[] content;
+ try {
+ content = serde.serialize(path, value);
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ store.put(path, content, Optional.of(-1L))
+ .thenAccept(stat -> {
+ // Make sure we have the value cached before the operation
is completed
+ // In addition to caching the value, we need to add a
watch on the path,
+ // so when/if it changes on any other node, we are
notified and we can
+ // update the cache
+ objCache.get(path).whenComplete((stat2, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else {
+ log.error("Exception while getting path {}", path,
ex);
+ future.completeExceptionally(ex.getCause());
+ }
+ });
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof BadVersionException) {
+ // Use already exists exception to provide more
self-explanatory error message
+ future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
+ } else {
+ future.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
+
return future;
}
@Override
public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
- return serialize(path, value).thenCompose(bytes -> {
- if (storeExtended != null) {
- return storeExtended.put(path, bytes, Optional.empty(),
options);
- } else {
- return store.put(path, bytes, Optional.empty());
- }
- }).thenAccept(__ -> {
- if (log.isDebugEnabled()) {
- log.debug("Refreshing path {} after put operation", path);
- }
- refresh(path);
- });
+ final byte[] bytes;
+ try {
+ bytes = serde.serialize(path, value);
+ } catch (IOException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ if (storeExtended != null) {
+ return storeExtended.put(path, bytes, Optional.empty(),
options).thenAccept(__ -> refresh(path));
+ } else {
+ return store.put(path, bytes, Optional.empty()).thenAccept(__ ->
refresh(path));
+ }
}
@Override
@@ -358,9 +323,6 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
switch (t.getType()) {
case Created:
case Modified:
- if (log.isDebugEnabled()) {
- log.debug("Refreshing path {} for {} notification", path,
t.getType());
- }
refresh(path);
break;
@@ -392,7 +354,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
final var next = backoff.next();
log.info("Update key {} conflicts. Retrying in {} ms.
Mandatory stop: {}. Elapsed time: {} ms", key,
next, backoff.isMandatoryStopMade(), elapsed);
- schedulerExecutor.schedule(() -> execute(op, key, result,
backoff), next, TimeUnit.MILLISECONDS);
+ executor.schedule(() -> execute(op, key, result, backoff),
next,
+ TimeUnit.MILLISECONDS);
return null;
}
result.completeExceptionally(ex.getCause());
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index d118a792e2f..b0e4b43f700 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,7 +26,6 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
@@ -39,17 +38,16 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
import org.apache.pulsar.common.util.FutureUtil;
@@ -78,9 +76,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new
CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>>
sessionListeners = new CopyOnWriteArrayList<>();
protected final String metadataStoreName;
- private final OrderedExecutor serDesExecutor;
- private final ExecutorService eventExecutor;
- private final ScheduledExecutorService schedulerExecutor;
+ protected final ScheduledExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches =
new CopyOnWriteArrayList<>();
@@ -97,21 +93,13 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected MetadataNodeSizeStats nodeSizeStats;
- protected AbstractMetadataStore(
- String metadataStoreName, OpenTelemetry openTelemetry,
MetadataNodeSizeStats nodeSizeStats,
- int numSerDesThreads) {
+ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry
openTelemetry,
+ MetadataNodeSizeStats nodeSizeStats) {
this.nodeSizeStats = nodeSizeStats == null ? new
DummyMetadataNodeSizeStats()
: nodeSizeStats;
- final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName
- : getClass().getSimpleName();
- this.eventExecutor = Executors.newSingleThreadExecutor(
- new DefaultThreadFactory(namePrefix + "-event"));
- this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory(namePrefix + "-scheduler"));
- this.serDesExecutor = OrderedExecutor.newBuilder()
- .numThreads(numSerDesThreads)
- .name(namePrefix + "-worker")
- .build();
+ this.executor = new ScheduledThreadPoolExecutor(1,
+ new DefaultThreadFactory(
+ StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
registerListener(this);
this.childrenCache = Caffeine.newBuilder()
@@ -261,8 +249,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
JavaType typeRef =
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
String cacheName = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
- new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.serDesExecutor,
- this.schedulerExecutor);
+ new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.executor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -271,8 +258,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef,
MetadataCacheConfig cacheConfig) {
String cacheName = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
- new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.serDesExecutor,
- this.schedulerExecutor);
+ new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.executor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -282,7 +268,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
MetadataCacheConfig
cacheConfig) {
MetadataCacheImpl<T> metadataCache =
new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
- this.serDesExecutor, this.schedulerExecutor);
+ this.executor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -362,7 +348,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
return null;
- }, eventExecutor);
+ }, executor);
} catch (RejectedExecutionException e) {
return FutureUtil.failedFuture(e);
}
@@ -545,7 +531,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
// Notice listeners.
try {
- eventExecutor.execute(() -> {
+ executor.execute(() -> {
sessionListeners.forEach(l -> {
try {
l.accept(event);
@@ -570,9 +556,8 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public void close() throws Exception {
- MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10,
TimeUnit.SECONDS);
- MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10,
TimeUnit.SECONDS);
- MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10,
TimeUnit.SECONDS);
+ executor.shutdownNow();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
this.metadataStoreStats.close();
}
@@ -589,30 +574,30 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- protected final <T> void processEvent(Consumer<T> eventProcessor, T event)
{
+ /**
+ * Run the task in the executor thread and fail the future if the executor
is shutting down.
+ */
+ @VisibleForTesting
+ public void execute(Runnable task, CompletableFuture<?> future) {
try {
- eventExecutor.execute(() -> eventProcessor.accept(event));
- } catch (RejectedExecutionException e) {
- log.warn("Rejected processing event {}", event);
+ executor.execute(task);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
}
}
- protected final void scheduleDelayedTask(long delay, TimeUnit unit,
Runnable task) {
- schedulerExecutor.schedule(task, delay, unit);
- }
-
- protected final void safeExecuteCallback(Runnable task,
Consumer<Throwable> exceptionHandler) {
+ /**
+ * Run the task in the executor thread and fail the future if the executor
is shutting down.
+ */
+ @VisibleForTesting
+ public void execute(Runnable task, Supplier<List<CompletableFuture<?>>>
futures) {
try {
- eventExecutor.execute(task);
- } catch (Throwable t) {
- exceptionHandler.accept(t);
+ executor.execute(task);
+ } catch (final Throwable t) {
+ futures.get().forEach(f -> f.completeExceptionally(t));
}
}
- protected final void safeExecuteCallback(Runnable task,
CompletableFuture<?> future) {
- safeExecuteCallback(task, future::completeExceptionally);
- }
-
protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index e1311fccfe0..3937fd712dc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -188,7 +188,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8),
EXISTS_GET_OPTION)
- .thenApply(gr -> gr.getCount() == 1);
+ .thenApplyAsync(gr -> gr.getCount() == 1, executor);
}
@Override
@@ -204,8 +204,9 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
return super.storePut(parent, new byte[0], Optional.empty(),
EnumSet.noneOf(CreateOption.class))
// Then create the unique key with the version added in
the path
- .thenCompose(
- stat -> super.storePut(path + stat.getVersion(),
data, optExpectedVersion, options));
+ .thenComposeAsync(
+ stat -> super.storePut(path + stat.getVersion(),
data, optExpectedVersion, options),
+ executor);
}
}
@@ -312,7 +313,9 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
} else {
log.warn("Failed to commit: {}", cause.getMessage());
- ops.forEach(o -> o.getFuture().completeExceptionally(ex));
+ executor.execute(() -> {
+ ops.forEach(o ->
o.getFuture().completeExceptionally(ex));
+ });
}
return null;
});
@@ -323,7 +326,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
private void handleBatchOperationResult(TxnResponse txnResponse,
List<MetadataOp> ops) {
- safeExecuteCallbacks(() -> {
+ executor.execute(() -> {
if (!txnResponse.isSucceeded()) {
if (ops.size() > 1) {
// Retry individually
@@ -401,7 +404,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
}
}
- }, ops);
+ });
}
private synchronized CompletableFuture<Void> createLease(boolean
retryOnFailure) {
@@ -441,7 +444,9 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
if (retryOnFailure) {
future.exceptionally(ex -> {
log.warn("Failed to create Etcd lease. Retrying later", ex);
- scheduleDelayedTask(1, TimeUnit.SECONDS, () ->
createLease(true));
+ executor.schedule(() -> {
+ createLease(true);
+ }, 1, TimeUnit.SECONDS);
return null;
});
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 627304b2edc..079cb3130e0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
+ metadataStoreConfig.getNodeSizeStats());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 08e5478ffcc..74bddda7454 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
+ metadataStoreConfig.getNodeSizeStats());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index f56d6c6941f..5bf7e2272f0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -119,7 +119,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
private void processSessionWatcher(WatchedEvent event) {
if (sessionWatcher != null) {
- processEvent(sessionWatcher::process, event);
+ executor.execute(() -> sessionWatcher.process(event));
}
}
@@ -245,8 +245,9 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
countsByType, totalSize, opsForLog);
// Retry with the individual operations
- scheduleDelayedTask(100, TimeUnit.MILLISECONDS,
- () -> ops.forEach(o ->
batchOperation(Collections.singletonList(o))));
+ executor.schedule(() -> {
+ ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
+ }, 100, TimeUnit.MILLISECONDS);
} else {
MetadataStoreException e = getException(code, path);
ops.forEach(o ->
o.getFuture().completeExceptionally(e));
@@ -255,7 +256,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
}
// Trigger all the futures in the batch
- safeExecuteCallbacks(() -> {
+ execute(() -> {
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);
@@ -277,7 +278,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
"Operation type not supported in
multi: " + op.getType()));
}
}
- }, ops);
+ }, () ->
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
}, null);
} catch (Throwable t) {
ops.forEach(o -> o.getFuture().completeExceptionally(new
MetadataStoreException(t)));
@@ -394,7 +395,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
try {
zkc.exists(path, null, (rc, path1, ctx, stat) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(true);
@@ -420,7 +421,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
try {
zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(null);
@@ -445,7 +446,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
CreateMode createMode = getCreateMode(opPut.getOptions());
asyncCreateFullPathOptimistic(zkc, opPut.getPath(),
opPut.getData(), createMode,
(rc, path1, ctx, name) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(new Stat(name, 0, 0, 0,
createMode.isEphemeral(), true));
@@ -459,7 +460,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
});
} else {
zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion,
(rc, path1, ctx, stat) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(getStat(path1, stat));
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 30989a41bd1..a9319a50fec 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,20 +18,16 @@
*/
package org.apache.pulsar.metadata.impl.batching;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -42,7 +38,6 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
-import org.jspecify.annotations.Nullable;
@Slf4j
public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore {
@@ -51,6 +46,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final MessagePassingQueue<MetadataOp> readOps;
private final MessagePassingQueue<MetadataOp> writeOps;
+ private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+
private final boolean enabled;
private final int maxDelayMillis;
protected final int maxOperations;
@@ -58,12 +55,9 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private MetadataEventSynchronizer synchronizer;
private final BatchMetadataStoreStats batchMetadataStoreStats;
protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
- @Nullable
- private final ScheduledExecutorService flushExecutor;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
- super(conf.getMetadataStoreName(), conf.getOpenTelemetry(),
conf.getNodeSizeStats(),
- conf.getNumSerDesThreads());
+ super(conf.getMetadataStoreName(), conf.getOpenTelemetry(),
conf.getNodeSizeStats());
this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -73,22 +67,18 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
if (enabled) {
readOps = new MpscUnboundedArrayQueue<>(10_000);
writeOps = new MpscUnboundedArrayQueue<>(10_000);
- final var name =
StringUtils.isNotBlank(conf.getMetadataStoreName()) ?
conf.getMetadataStoreName()
- : getClass().getSimpleName();
- flushExecutor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory(
- name + "-batch-flusher"));
- scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush,
maxDelayMillis, maxDelayMillis,
- TimeUnit.MILLISECONDS);
+ scheduledTask =
+ executor.scheduleAtFixedRate(this::flush, maxDelayMillis,
maxDelayMillis, TimeUnit.MILLISECONDS);
} else {
scheduledTask = null;
readOps = null;
writeOps = null;
- flushExecutor = null;
}
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(conf.getSynchronizer());
- this.batchMetadataStoreStats = new
BatchMetadataStoreStats(metadataStoreName);
+ this.batchMetadataStoreStats =
+ new BatchMetadataStoreStats(metadataStoreName, executor,
conf.getOpenTelemetry());
this.metadataStoreBatchStrategy = new
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
}
@@ -106,13 +96,12 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
op.getFuture().completeExceptionally(ex);
}
scheduledTask.cancel(true);
- MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10,
TimeUnit.SECONDS);
}
super.close();
this.batchMetadataStoreStats.close();
}
- private synchronized void flush() {
+ private void flush() {
List<MetadataOp> currentBatch;
if (!readOps.isEmpty()) {
while (CollectionUtils.isNotEmpty(currentBatch =
metadataStoreBatchStrategy.nextBatch(readOps))) {
@@ -124,6 +113,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
internalBatchOperation(currentBatch);
}
}
+
+ flushInProgress.set(false);
}
@Override
@@ -178,8 +169,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
internalBatchOperation(Collections.singletonList(op));
return;
}
- if (queue.size() > maxOperations) {
- flush();
+ if (queue.size() > maxOperations &&
flushInProgress.compareAndSet(false, true)) {
+ executor.execute(this::flush);
}
} else {
internalBatchOperation(Collections.singletonList(op));
@@ -203,8 +194,4 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
}
protected abstract void batchOperation(List<MetadataOp> ops);
-
- protected final void safeExecuteCallbacks(Runnable runnable,
List<MetadataOp> ops) {
- safeExecuteCallback(runnable, t -> ops.forEach(op ->
op.getFuture().completeExceptionally(t)));
- }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 407a927bda4..d055dd7da55 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private Optional<MetadataEventSynchronizer> synchronizer;
public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
- super("oxia-metadata", OpenTelemetry.noop(), null, 1);
+ super("oxia-metadata", OpenTelemetry.noop(), null);
this.client = oxia;
this.identity = identity;
this.synchronizer = Optional.empty();
@@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
boolean enableSessionWatcher)
throws Exception {
super("oxia-metadata",
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
+ metadataStoreConfig.getNodeSizeStats());
var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index 82cc15d8aaf..9549a8df8f9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,13 +18,23 @@
*/
package org.apache.pulsar.metadata.impl.stats;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
public final class BatchMetadataStoreStats implements AutoCloseable {
private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50,
100, 200, 500, 1000};
private static final String NAME = "name";
+ private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+ .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+ .labelNames(NAME)
+ .register();
private static final Histogram OPS_WAITING = Histogram
.build("pulsar_batch_metadata_store_queue_wait_time", "-")
.unit("ms")
@@ -44,17 +54,46 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
.register();
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final ThreadPoolExecutor executor;
private final String metadataStoreName;
private final Histogram.Child batchOpsWaitingChild;
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;
- public BatchMetadataStoreStats(String metadataStoreName) {
+ public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME =
"pulsar.broker.metadata.store.executor.queue.size";
+ private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
+
+ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService
executor, OpenTelemetry openTelemetry) {
+ if (executor instanceof ThreadPoolExecutor tx) {
+ this.executor = tx;
+ } else {
+ this.executor = null;
+ }
this.metadataStoreName = metadataStoreName;
+
+ EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return getQueueSize();
+ }
+ }, metadataStoreName);
+
this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
this.batchExecuteTimeChild =
BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+
+ var meter = openTelemetry.getMeter("org.apache.pulsar");
+ var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME,
metadataStoreName);
+ this.batchMetadataStoreSizeCounter = meter
+ .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
+ .setDescription("The number of batch operations in the
metadata store executor queue")
+ .setUnit("{operation}")
+ .buildWithCallback(measurement ->
measurement.record(getQueueSize(), attributes));
+ }
+
+ private int getQueueSize() {
+ return executor == null ? 0 : executor.getQueue().size();
}
public void recordOpWaiting(long millis) {
@@ -72,9 +111,11 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
+ EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
+ batchMetadataStoreSizeCounter.close();
}
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 0ae0b022a35..d42b2228346 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
public static class MyMetadataStore extends AbstractMetadataStore {
protected MyMetadataStore() {
- super("custom", OpenTelemetry.noop(), null, 1);
+ super("custom", OpenTelemetry.noop(), null);
}
@Override