This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b852bcb547 [metadata] fix MetadataStore#put may get unexcepted 
exception (#14903)
3b852bcb547 is described below

commit 3b852bcb54780ae26cd339e5e30bbe83de3a30cd
Author: WangJialing <65590138+wangjialing...@users.noreply.github.com>
AuthorDate: Wed Jul 13 08:02:46 2022 +0800

    [metadata] fix MetadataStore#put may get unexcepted exception (#14903)
    
    * fix MetadataStore#put may get unexcepted exception
    
    * fix checksytle
    
    * add test
    
    * update test code
---
 .../pulsar/metadata/impl/ZKMetadataStore.java       |  8 +++++++-
 .../apache/pulsar/metadata/MetadataStoreTest.java   | 21 +++++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 6697934d56b..ad723f28f89 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -386,7 +386,13 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                 put(opPut.getPath(), opPut.getData(), 
Optional.of(-1L)).thenAccept(
                                                 s -> future.complete(s))
                                         .exceptionally(ex -> {
-                                            
future.completeExceptionally(MetadataStoreException.wrap(ex.getCause()));
+                                            if (ex.getCause() instanceof 
BadVersionException) {
+                                                // The z-node exist now, let's 
overwrite it
+                                                internalStorePut(opPut);
+                                            } else {
+                                                future.completeExceptionally(
+                                                        
MetadataStoreException.wrap(ex.getCause()));
+                                            }
                                             return null;
                                         });
                             }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index f4e5bf779ba..b8a091fef25 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -38,6 +38,7 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -84,6 +85,26 @@ public class MetadataStoreTest extends BaseMetadataStoreTest 
{
         }
     }
 
+    @Test(dataProvider = "impl")
+    public void concurrentPutTest(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
+
+        String data = "data";
+        String path = "/non-existing-key";
+        int concurrent = 50;
+        List<CompletableFuture<Stat>> futureList = new ArrayList<>();
+        for (int i = 0; i < concurrent; i++) {
+            futureList.add(store.put(path, data.getBytes(), 
Optional.empty()).exceptionally(ex -> {
+                fail("fail to execute concurrent put", ex);
+                return null;
+            }));
+        }
+        FutureUtil.waitForAll(futureList).join();
+
+        assertEquals(store.get(path).join().get().getValue(), data.getBytes());
+    }
+
     @Test(dataProvider = "impl")
     public void insertionTestWithExpectedVersion(String provider, 
Supplier<String> urlSupplier) throws Exception {
         @Cleanup

Reply via email to