This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d96124208 [fix] [meta]Switch to the metadata store thread after zk 
operation (#20303)
35d96124208 is described below

commit 35d961242089a0448cb425da637a2afbdaca0005
Author: fengyubiao <[email protected]>
AuthorDate: Tue May 30 18:07:31 2023 +0800

    [fix] [meta]Switch to the metadata store thread after zk operation (#20303)
---
 .../metadata/impl/AbstractMetadataStore.java       | 13 ++++
 .../pulsar/metadata/impl/ZKMetadataStore.java      | 46 +++++++-------
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 70 ++++++++++++++++++++++
 3 files changed, 106 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 6fcf8eb6b49..4cadf2397a7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -534,6 +535,18 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
+    /**
+     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
+     */
+    @VisibleForTesting
+    public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> 
futures) {
+        try {
+            executor.execute(task);
+        } catch (final Throwable t) {
+            futures.get().forEach(f -> f.completeExceptionally(t));
+        }
+    }
+
     protected static String parent(String path) {
         int idx = path.lastIndexOf('/');
         if (idx <= 0) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index a6d8eb8344c..079ae3e2ae5 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -204,29 +204,29 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 }
 
                 // Trigger all the futures in the batch
-                for (int i = 0; i < ops.size(); i++) {
-                    OpResult opr = results.get(i);
-                    MetadataOp op = ops.get(i);
-
-                    switch (op.getType()) {
-                        case PUT:
-                            handlePutResult(op.asPut(), opr);
-                            break;
-                        case DELETE:
-                            handleDeleteResult(op.asDelete(), opr);
-                            break;
-                        case GET:
-                            handleGetResult(op.asGet(), opr);
-                            break;
-                        case GET_CHILDREN:
-                            handleGetChildrenResult(op.asGetChildren(), opr);
-                            break;
-
-                        default:
-                            op.getFuture().completeExceptionally(new 
MetadataStoreException(
-                                    "Operation type not supported in multi: " 
+ op.getType()));
-                    }
-                }
+                execute(() -> {
+                    for (int i = 0; i < ops.size(); i++) {
+                        OpResult opr = results.get(i);
+                        MetadataOp op = ops.get(i);
+                            switch (op.getType()) {
+                                case PUT:
+                                    handlePutResult(op.asPut(), opr);
+                                    break;
+                                case DELETE:
+                                    handleDeleteResult(op.asDelete(), opr);
+                                    break;
+                                case GET:
+                                    handleGetResult(op.asGet(), opr);
+                                    break;
+                                case GET_CHILDREN:
+                                    
handleGetChildrenResult(op.asGetChildren(), opr);
+                                    break;
+                                default:
+                                    op.getFuture().completeExceptionally(new 
MetadataStoreException(
+                                            "Operation type not supported in 
multi: " + op.getType()));
+                            }
+                        }
+                }, () -> 
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
             }, null);
         } catch (Throwable t) {
             ops.forEach(o -> o.getFuture().completeExceptionally(new 
MetadataStoreException(t)));
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 949b4a9b2ba..c87e9bda436 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -50,8 +51,10 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.assertj.core.util.Lists;
 import org.awaitility.Awaitility;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -425,6 +428,73 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         assertFalse(store.exists(prefix).join());
     }
 
+    @DataProvider(name = "conditionOfSwitchThread")
+    public Object[][] conditionOfSwitchThread(){
+        return new Object[][]{
+            {false, false},
+            {false, true},
+            {true, false},
+            {true, true}
+        };
+    }
+
+    @Test(dataProvider = "conditionOfSwitchThread")
+    public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, 
boolean enabledBatch) throws Exception {
+        final String prefix = newKey();
+        final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+        MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+                
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+        builder.fsyncEnable(false);
+        builder.batchingEnabled(enabledBatch);
+        if (!hasSynchronizer) {
+            builder.synchronizer(null);
+        }
+        MetadataStoreConfig config = builder.build();
+        @Cleanup
+        ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+        final Runnable verify = () -> {
+            String currentThreadName = Thread.currentThread().getName();
+            String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
+                    metadataStoreName, currentThreadName);
+            
assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName), 
errorMessage);
+        };
+
+        // put with node which has parent(but the parent node is not exists).
+        store.put(prefix + "/a1/b1/c1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // put.
+        store.put(prefix + "/b1", "value".getBytes(), 
Optional.of(-1L)).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // get.
+        store.get(prefix + "/b1").thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // get the node which is not exists.
+        store.get(prefix + "/non").thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // delete.
+        store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).join();
+        // delete the node which is not exists.
+        store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+            verify.run();
+            return null;
+        }).exceptionally(ex -> {
+            verify.run();
+            return null;
+        }).join();
+    }
+
     @Test(dataProvider = "impl")
     public void testPersistent(String provider, Supplier<String> urlSupplier) 
throws Exception {
         String metadataUrl = urlSupplier.get();

Reply via email to