This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3b852bcb547 [metadata] fix MetadataStore#put may get unexcepted exception (#14903) 3b852bcb547 is described below commit 3b852bcb54780ae26cd339e5e30bbe83de3a30cd Author: WangJialing <65590138+wangjialing...@users.noreply.github.com> AuthorDate: Wed Jul 13 08:02:46 2022 +0800 [metadata] fix MetadataStore#put may get unexcepted exception (#14903) * fix MetadataStore#put may get unexcepted exception * fix checksytle * add test * update test code --- .../pulsar/metadata/impl/ZKMetadataStore.java | 8 +++++++- .../apache/pulsar/metadata/MetadataStoreTest.java | 21 +++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 6697934d56b..ad723f28f89 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -386,7 +386,13 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept( s -> future.complete(s)) .exceptionally(ex -> { - future.completeExceptionally(MetadataStoreException.wrap(ex.getCause())); + if (ex.getCause() instanceof BadVersionException) { + // The z-node exist now, let's overwrite it + internalStorePut(opPut); + } else { + future.completeExceptionally( + MetadataStoreException.wrap(ex.getCause())); + } return null; }); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index f4e5bf779ba..b8a091fef25 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -38,6 +38,7 @@ import java.util.function.Supplier; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -84,6 +85,26 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { } } + @Test(dataProvider = "impl") + public void concurrentPutTest(String provider, Supplier<String> urlSupplier) throws Exception { + @Cleanup + MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + + String data = "data"; + String path = "/non-existing-key"; + int concurrent = 50; + List<CompletableFuture<Stat>> futureList = new ArrayList<>(); + for (int i = 0; i < concurrent; i++) { + futureList.add(store.put(path, data.getBytes(), Optional.empty()).exceptionally(ex -> { + fail("fail to execute concurrent put", ex); + return null; + })); + } + FutureUtil.waitForAll(futureList).join(); + + assertEquals(store.get(path).join().get().getValue(), data.getBytes()); + } + @Test(dataProvider = "impl") public void insertionTestWithExpectedVersion(String provider, Supplier<String> urlSupplier) throws Exception { @Cleanup