Copilot commented on code in PR #25187:
URL: https://github.com/apache/pulsar/pull/25187#discussion_r2751383117


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -492,6 +492,12 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private boolean metadataStoreAllowReadOnlyOperations;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "The number of threads uses for serializing and 
deserializing data to and from the metadata store"

Review Comment:
   The documentation comment has a grammatical error. It should read "The 
number of threads used for" instead of "The number of threads uses for".
   ```suggestion
               doc = "The number of threads used for serializing and 
deserializing data to and from the metadata store"
   ```



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -598,8 +610,10 @@ protected static <T> CompletableFuture<T> 
alreadyClosedFailedFuture() {
 
     @Override
     public void close() throws Exception {
-        executor.shutdownNow();
-        executor.awaitTermination(10, TimeUnit.SECONDS);
+        serDesExecutor.shutdown();
+        schedulerExecutor.shutdown();

Review Comment:
   The executor shutdown order may cause issues. The `serDesExecutor` and 
`schedulerExecutor` are shutdown (non-forcefully) but there's no await on their 
termination, while `eventExecutor` is forcefully shutdown with `shutdownNow()` 
and awaits termination. If there are pending tasks in `serDesExecutor` or 
`schedulerExecutor` that submit tasks to `eventExecutor`, those tasks will be 
rejected. Consider either: 1) awaiting termination on all executors, or 2) 
using `shutdownNow()` consistently across all executors, or 3) ensuring proper 
ordering where `eventExecutor` is shutdown after the other executors complete.
   ```suggestion
           schedulerExecutor.shutdown();
           serDesExecutor.awaitTermination(10, TimeUnit.SECONDS);
           schedulerExecutor.awaitTermination(10, TimeUnit.SECONDS);
   ```



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -96,12 +105,13 @@ public void close() throws Exception {
                 op.getFuture().completeExceptionally(ex);
             }
             scheduledTask.cancel(true);
+            flushExecutor.shutdown();

Review Comment:
   The `flushExecutor` is shutdown without awaiting its termination. After 
canceling the scheduled task and calling `flushExecutor.shutdown()`, there may 
still be a flush task in progress. Without awaiting termination, the subsequent 
call to `super.close()` will shutdown the parent executors that may be needed 
by the ongoing flush task. Consider adding `flushExecutor.awaitTermination()` 
after the shutdown call to ensure the flush task completes before closing 
parent resources.
   ```suggestion
               flushExecutor.shutdown();
               try {
                   if (!flushExecutor.awaitTermination(maxDelayMillis, 
TimeUnit.MILLISECONDS)) {
                       flushExecutor.shutdownNow();
                   }
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
               }
   ```



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -67,18 +72,22 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig 
conf) {
         if (enabled) {
             readOps = new MpscUnboundedArrayQueue<>(10_000);
             writeOps = new MpscUnboundedArrayQueue<>(10_000);
-            scheduledTask =
-                    executor.scheduleAtFixedRate(this::flush, maxDelayMillis, 
maxDelayMillis, TimeUnit.MILLISECONDS);
+            final var name = StringUtils.isBlank(conf.getMetadataStoreName()) 
? conf.getMetadataStoreName()

Review Comment:
   The logic for determining the thread name is inverted. When 
`metadataStoreName` is blank, the condition returns the blank name instead of 
using the class simple name as a fallback. This should be 
`StringUtils.isNotBlank(conf.getMetadataStoreName())` to properly fallback to 
`getClass().getSimpleName()` when the name is blank.
   ```suggestion
               final var name = 
StringUtils.isNotBlank(conf.getMetadataStoreName()) ? 
conf.getMetadataStoreName()
   ```



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -96,13 +99,20 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected MetadataNodeSizeStats nodeSizeStats;
 
-    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry,
-                MetadataNodeSizeStats nodeSizeStats) {
+    protected AbstractMetadataStore(
+            String metadataStoreName, OpenTelemetry openTelemetry, 
MetadataNodeSizeStats nodeSizeStats,
+            int numSerDesThreads) {
         this.nodeSizeStats = nodeSizeStats == null ? new 
DummyMetadataNodeSizeStats()
                 : nodeSizeStats;
-        this.executor = new ScheduledThreadPoolExecutor(1,
-                new DefaultThreadFactory(
-                        StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
+        final var namePrefix = StringUtils.isBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();

Review Comment:
   The logic for determining the thread name prefix is inverted. When 
`metadataStoreName` is blank, the condition returns the blank name instead of 
using the class simple name as a fallback. This should be 
`StringUtils.isNotBlank(metadataStoreName)` to properly fallback to 
`getClass().getSimpleName()` when the name is blank.
   ```suggestion
           final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
   ```



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java:
##########
@@ -169,8 +177,8 @@ private void enqueue(MessagePassingQueue<MetadataOp> queue, 
MetadataOp op) {
                 internalBatchOperation(Collections.singletonList(op));
                 return;
             }
-            if (queue.size() > maxOperations && 
flushInProgress.compareAndSet(false, true)) {
-                executor.execute(this::flush);
+            if (queue.size() > maxOperations) {
+                flush();

Review Comment:
   The `flush()` method can now be called from multiple threads: from the 
scheduled task in `flushExecutor` and directly from `enqueue()` when the queue 
size exceeds `maxOperations`. While the method is synchronized, calling 
`flush()` directly from `enqueue()` (which can be called from any thread) may 
block the caller thread unnecessarily. Consider using 
`flushExecutor.execute(this::flush)` instead of calling `flush()` directly to 
avoid blocking the caller thread and maintain consistent execution in the flush 
executor.
   ```suggestion
                   flushExecutor.execute(this::flush);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to