Copilot commented on code in PR #25187:
URL: https://github.com/apache/pulsar/pull/25187#discussion_r2751383117
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -492,6 +492,12 @@ The max allowed delay for delayed delivery (in
milliseconds). If the broker rece
)
private boolean metadataStoreAllowReadOnlyOperations;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The number of threads uses for serializing and
deserializing data to and from the metadata store"
Review Comment:
The documentation comment has a grammatical error. It should read "The
number of threads used for" instead of "The number of threads uses for".
```suggestion
doc = "The number of threads used for serializing and
deserializing data to and from the metadata store"
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -598,8 +610,10 @@ protected static <T> CompletableFuture<T>
alreadyClosedFailedFuture() {
@Override
public void close() throws Exception {
- executor.shutdownNow();
- executor.awaitTermination(10, TimeUnit.SECONDS);
+ serDesExecutor.shutdown();
+ schedulerExecutor.shutdown();
Review Comment:
The executor shutdown order may cause issues. The `serDesExecutor` and
`schedulerExecutor` are shutdown (non-forcefully) but there's no await on their
termination, while `eventExecutor` is forcefully shutdown with `shutdownNow()`
and awaits termination. If there are pending tasks in `serDesExecutor` or
`schedulerExecutor` that submit tasks to `eventExecutor`, those tasks will be
rejected. Consider either: 1) awaiting termination on all executors, or 2)
using `shutdownNow()` consistently across all executors, or 3) ensuring proper
ordering where `eventExecutor` is shutdown after the other executors complete.
```suggestion
schedulerExecutor.shutdown();
serDesExecutor.awaitTermination(10, TimeUnit.SECONDS);
schedulerExecutor.awaitTermination(10, TimeUnit.SECONDS);
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -96,12 +105,13 @@ public void close() throws Exception {
op.getFuture().completeExceptionally(ex);
}
scheduledTask.cancel(true);
+ flushExecutor.shutdown();
Review Comment:
The `flushExecutor` is shutdown without awaiting its termination. After
canceling the scheduled task and calling `flushExecutor.shutdown()`, there may
still be a flush task in progress. Without awaiting termination, the subsequent
call to `super.close()` will shutdown the parent executors that may be needed
by the ongoing flush task. Consider adding `flushExecutor.awaitTermination()`
after the shutdown call to ensure the flush task completes before closing
parent resources.
```suggestion
flushExecutor.shutdown();
try {
if (!flushExecutor.awaitTermination(maxDelayMillis,
TimeUnit.MILLISECONDS)) {
flushExecutor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -67,18 +72,22 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig
conf) {
if (enabled) {
readOps = new MpscUnboundedArrayQueue<>(10_000);
writeOps = new MpscUnboundedArrayQueue<>(10_000);
- scheduledTask =
- executor.scheduleAtFixedRate(this::flush, maxDelayMillis,
maxDelayMillis, TimeUnit.MILLISECONDS);
+ final var name = StringUtils.isBlank(conf.getMetadataStoreName())
? conf.getMetadataStoreName()
Review Comment:
The logic for determining the thread name is inverted. When
`metadataStoreName` is blank, the condition returns the blank name instead of
using the class simple name as a fallback. This should be
`StringUtils.isNotBlank(conf.getMetadataStoreName())` to properly fallback to
`getClass().getSimpleName()` when the name is blank.
```suggestion
final var name =
StringUtils.isNotBlank(conf.getMetadataStoreName()) ?
conf.getMetadataStoreName()
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -96,13 +99,20 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected MetadataNodeSizeStats nodeSizeStats;
- protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry
openTelemetry,
- MetadataNodeSizeStats nodeSizeStats) {
+ protected AbstractMetadataStore(
+ String metadataStoreName, OpenTelemetry openTelemetry,
MetadataNodeSizeStats nodeSizeStats,
+ int numSerDesThreads) {
this.nodeSizeStats = nodeSizeStats == null ? new
DummyMetadataNodeSizeStats()
: nodeSizeStats;
- this.executor = new ScheduledThreadPoolExecutor(1,
- new DefaultThreadFactory(
- StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
+ final var namePrefix = StringUtils.isBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
Review Comment:
The logic for determining the thread name prefix is inverted. When
`metadataStoreName` is blank, the condition returns the blank name instead of
using the class simple name as a fallback. This should be
`StringUtils.isNotBlank(metadataStoreName)` to properly fallback to
`getClass().getSimpleName()` when the name is blank.
```suggestion
final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
```
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -169,8 +177,8 @@ private void enqueue(MessagePassingQueue<MetadataOp> queue,
MetadataOp op) {
internalBatchOperation(Collections.singletonList(op));
return;
}
- if (queue.size() > maxOperations &&
flushInProgress.compareAndSet(false, true)) {
- executor.execute(this::flush);
+ if (queue.size() > maxOperations) {
+ flush();
Review Comment:
The `flush()` method can now be called from multiple threads: from the
scheduled task in `flushExecutor` and directly from `enqueue()` when the queue
size exceeds `maxOperations`. While the method is synchronized, calling
`flush()` directly from `enqueue()` (which can be called from any thread) may
block the caller thread unnecessarily. Consider using
`flushExecutor.execute(this::flush)` instead of calling `flush()` directly to
avoid blocking the caller thread and maintain consistent execution in the flush
executor.
```suggestion
flushExecutor.execute(this::flush);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]