This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f7d35e5ddbf [improve][meta] Fix invalid use of drain API and race condition in closing metadata store (#22585) f7d35e5ddbf is described below commit f7d35e5ddbfb96ef4eda636ba7808868dc56017f Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Fri Jun 14 04:24:07 2024 +0300 [improve][meta] Fix invalid use of drain API and race condition in closing metadata store (#22585) Co-authored-by: Matteo Merli <mme...@apache.org> --- .../batching/AbstractBatchedMetadataStore.java | 29 +++++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 5b45530d2e2..4fa1c6aca0f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -86,9 +86,13 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore // Fail all the pending items MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); - readOps.drain(op -> op.getFuture().completeExceptionally(ex)); - writeOps.drain(op -> op.getFuture().completeExceptionally(ex)); - + MetadataOp op; + while ((op = readOps.poll()) != null) { + op.getFuture().completeExceptionally(ex); + } + while ((op = writeOps.poll()) != null) { + op.getFuture().completeExceptionally(ex); + } scheduledTask.cancel(true); } super.close(); @@ -98,7 +102,13 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private void flush() { while (!readOps.isEmpty()) { List<MetadataOp> ops = new ArrayList<>(); - readOps.drain(ops::add, maxOperations); + for (int i = 0; i < maxOperations; i++) { + MetadataOp op = readOps.poll(); + if (op == null) { + break; + } + ops.add(op); + } internalBatchOperation(ops); } @@ -167,6 +177,11 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) { + if (isClosed()) { + MetadataStoreException ex = new MetadataStoreException.AlreadyClosedException(); + op.getFuture().completeExceptionally(ex); + return; + } if (enabled) { if (!queue.offer(op)) { // Execute individually if we're failing to enqueue @@ -182,6 +197,12 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } private void internalBatchOperation(List<MetadataOp> ops) { + if (isClosed()) { + MetadataStoreException ex = + new MetadataStoreException.AlreadyClosedException(); + ops.forEach(op -> op.getFuture().completeExceptionally(ex)); + return; + } long now = System.currentTimeMillis(); for (MetadataOp op : ops) { this.batchMetadataStoreStats.recordOpWaiting(now - op.created());