This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f7d35e5ddbf [improve][meta] Fix invalid use of drain API and race 
condition in closing metadata store (#22585)
f7d35e5ddbf is described below

commit f7d35e5ddbfb96ef4eda636ba7808868dc56017f
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Fri Jun 14 04:24:07 2024 +0300

    [improve][meta] Fix invalid use of drain API and race condition in closing 
metadata store (#22585)
    
    Co-authored-by: Matteo Merli <mme...@apache.org>
---
 .../batching/AbstractBatchedMetadataStore.java     | 29 +++++++++++++++++++---
 1 file changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 5b45530d2e2..4fa1c6aca0f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -86,9 +86,13 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
             // Fail all the pending items
             MetadataStoreException ex =
                     new 
MetadataStoreException.AlreadyClosedException("Metadata store is getting 
closed");
-            readOps.drain(op -> op.getFuture().completeExceptionally(ex));
-            writeOps.drain(op -> op.getFuture().completeExceptionally(ex));
-
+            MetadataOp op;
+            while ((op = readOps.poll()) != null) {
+                op.getFuture().completeExceptionally(ex);
+            }
+            while ((op = writeOps.poll()) != null) {
+                op.getFuture().completeExceptionally(ex);
+            }
             scheduledTask.cancel(true);
         }
         super.close();
@@ -98,7 +102,13 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private void flush() {
         while (!readOps.isEmpty()) {
             List<MetadataOp> ops = new ArrayList<>();
-            readOps.drain(ops::add, maxOperations);
+            for (int i = 0; i < maxOperations; i++) {
+                MetadataOp op = readOps.poll();
+                if (op == null) {
+                    break;
+                }
+                ops.add(op);
+            }
             internalBatchOperation(ops);
         }
 
@@ -167,6 +177,11 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     }
 
     private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) 
{
+        if (isClosed()) {
+            MetadataStoreException ex = new 
MetadataStoreException.AlreadyClosedException();
+            op.getFuture().completeExceptionally(ex);
+            return;
+        }
         if (enabled) {
             if (!queue.offer(op)) {
                 // Execute individually if we're failing to enqueue
@@ -182,6 +197,12 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     }
 
     private void internalBatchOperation(List<MetadataOp> ops) {
+        if (isClosed()) {
+            MetadataStoreException ex =
+                    new MetadataStoreException.AlreadyClosedException();
+            ops.forEach(op -> op.getFuture().completeExceptionally(ex));
+            return;
+        }
         long now = System.currentTimeMillis();
         for (MetadataOp op : ops) {
             this.batchMetadataStoreStats.recordOpWaiting(now - op.created());

Reply via email to