This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35d96124208 [fix] [meta]Switch to the metadata store thread after zk
operation (#20303)
35d96124208 is described below
commit 35d961242089a0448cb425da637a2afbdaca0005
Author: fengyubiao <[email protected]>
AuthorDate: Tue May 30 18:07:31 2023 +0800
[fix] [meta]Switch to the metadata store thread after zk operation (#20303)
---
.../metadata/impl/AbstractMetadataStore.java | 13 ++++
.../pulsar/metadata/impl/ZKMetadataStore.java | 46 +++++++-------
.../apache/pulsar/metadata/MetadataStoreTest.java | 70 ++++++++++++++++++++++
3 files changed, 106 insertions(+), 23 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 6fcf8eb6b49..4cadf2397a7 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -534,6 +535,18 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
+ /**
+ * Run the task in the executor thread and fail the future if the executor
is shutting down.
+ */
+ @VisibleForTesting
+ public void execute(Runnable task, Supplier<List<CompletableFuture<?>>>
futures) {
+ try {
+ executor.execute(task);
+ } catch (final Throwable t) {
+ futures.get().forEach(f -> f.completeExceptionally(t));
+ }
+ }
+
protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index a6d8eb8344c..079ae3e2ae5 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -204,29 +204,29 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
}
// Trigger all the futures in the batch
- for (int i = 0; i < ops.size(); i++) {
- OpResult opr = results.get(i);
- MetadataOp op = ops.get(i);
-
- switch (op.getType()) {
- case PUT:
- handlePutResult(op.asPut(), opr);
- break;
- case DELETE:
- handleDeleteResult(op.asDelete(), opr);
- break;
- case GET:
- handleGetResult(op.asGet(), opr);
- break;
- case GET_CHILDREN:
- handleGetChildrenResult(op.asGetChildren(), opr);
- break;
-
- default:
- op.getFuture().completeExceptionally(new
MetadataStoreException(
- "Operation type not supported in multi: "
+ op.getType()));
- }
- }
+ execute(() -> {
+ for (int i = 0; i < ops.size(); i++) {
+ OpResult opr = results.get(i);
+ MetadataOp op = ops.get(i);
+ switch (op.getType()) {
+ case PUT:
+ handlePutResult(op.asPut(), opr);
+ break;
+ case DELETE:
+ handleDeleteResult(op.asDelete(), opr);
+ break;
+ case GET:
+ handleGetResult(op.asGet(), opr);
+ break;
+ case GET_CHILDREN:
+
handleGetChildrenResult(op.asGetChildren(), opr);
+ break;
+ default:
+ op.getFuture().completeExceptionally(new
MetadataStoreException(
+ "Operation type not supported in
multi: " + op.getType()));
+ }
+ }
+ }, () ->
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
}, null);
} catch (Throwable t) {
ops.forEach(o -> o.getFuture().completeExceptionally(new
MetadataStoreException(t)));
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 949b4a9b2ba..c87e9bda436 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -50,8 +51,10 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -425,6 +428,73 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
assertFalse(store.exists(prefix).join());
}
+ @DataProvider(name = "conditionOfSwitchThread")
+ public Object[][] conditionOfSwitchThread(){
+ return new Object[][]{
+ {false, false},
+ {false, true},
+ {true, false},
+ {true, true}
+ };
+ }
+
+ @Test(dataProvider = "conditionOfSwitchThread")
+ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer,
boolean enabledBatch) throws Exception {
+ final String prefix = newKey();
+ final String metadataStoreName =
UUID.randomUUID().toString().replaceAll("-", "");
+ MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+ builder.fsyncEnable(false);
+ builder.batchingEnabled(enabledBatch);
+ if (!hasSynchronizer) {
+ builder.synchronizer(null);
+ }
+ MetadataStoreConfig config = builder.build();
+ @Cleanup
+ ZKMetadataStore store = (ZKMetadataStore)
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+ final Runnable verify = () -> {
+ String currentThreadName = Thread.currentThread().getName();
+ String errorMessage = String.format("Expect to switch to thread
%s, but currently it is thread %s",
+ metadataStoreName, currentThreadName);
+
assertTrue(Thread.currentThread().getName().startsWith(metadataStoreName),
errorMessage);
+ };
+
+ // put with node which has parent(but the parent node is not exists).
+ store.put(prefix + "/a1/b1/c1", "value".getBytes(),
Optional.of(-1L)).thenApply((ignore) -> {
+ verify.run();
+ return null;
+ }).join();
+ // put.
+ store.put(prefix + "/b1", "value".getBytes(),
Optional.of(-1L)).thenApply((ignore) -> {
+ verify.run();
+ return null;
+ }).join();
+ // get.
+ store.get(prefix + "/b1").thenApply((ignore) -> {
+ verify.run();
+ return null;
+ }).join();
+ // get the node which is not exists.
+ store.get(prefix + "/non").thenApply((ignore) -> {
+ verify.run();
+ return null;
+ }).join();
+ // delete.
+ store.delete(prefix + "/b1", Optional.empty()).thenApply((ignore) -> {
+ verify.run();
+ return null;
+ }).join();
+ // delete the node which is not exists.
+ store.delete(prefix + "/non", Optional.empty()).thenApply((ignore) -> {
+ verify.run();
+ return null;
+ }).exceptionally(ex -> {
+ verify.run();
+ return null;
+ }).join();
+ }
+
@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier)
throws Exception {
String metadataUrl = urlSupplier.get();