This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 38bba54  Unit test to verify metadata cache consistency across 
brokers. (#11202)
38bba54 is described below

commit 38bba54a167ed3fba419bc2b7da36ce41e2ece21
Author: Bharani Chadalavada <bharanic....@gmail.com>
AuthorDate: Mon Jul 26 15:47:53 2021 -0700

    Unit test to verify metadata cache consistency across brokers. (#11202)
    
    Co-authored-by: Bharani Chadalavada <bchadalav...@splunk.com>
---
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 53 +++++++++++++++++++++-
 1 file changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index f2c25c4..68f772c 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -33,13 +33,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import lombok.AllArgsConstructor;
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.CacheGetResult;
@@ -51,11 +54,14 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsExcept
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class MetadataCacheTest extends BaseMetadataStoreTest {
 
     @Data
@@ -102,7 +108,7 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
     }
 
     @Test(dataProvider = "zk")
-    public void crossStoreUpdates(String provider, String url) throws 
Exception {
+    public void crossStoreAddDelete(String provider, String url) throws 
Exception {
         @Cleanup
         MetadataStore store1 = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
 
@@ -165,6 +171,51 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         });
     }
 
+    @Test(dataProvider = "zk")
+    public void crossStoreUpdates(String provider, String url) throws 
Exception {
+        String testName = "cross store updates";
+        @Cleanup
+        MetadataStore store1 = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStore store2 = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCacheImpl<MyClass> objCache1 = (MetadataCacheImpl<MyClass>) 
store1.getMetadataCache(MyClass.class);
+
+        MetadataCacheImpl<MyClass> objCache2 = (MetadataCacheImpl<MyClass>) 
store2.getMetadataCache(MyClass.class);
+        AtomicReference<MyClass> storeObj = new AtomicReference<MyClass>();
+        store2.registerListener(n -> {
+            if (n.getType() == NotificationType.Modified) {
+                CompletableFuture.runAsync(() -> {
+                    try {
+                        MyClass obj = objCache2.get(n.getPath()).get().get();
+                        storeObj.set(obj);
+                    } catch (Exception e) {
+                        log.error("Got exception {}", e.getMessage());
+                    }
+                });
+            };
+        });
+
+        String key1 = "/test-key1";
+        assertEquals(objCache1.getIfCached(key1), Optional.empty());
+        assertEquals(objCache2.getIfCached(key1), Optional.empty());
+
+        MyClass value1 = new MyClass(testName, 1);
+        objCache1.create(key1, value1).join();
+
+        Awaitility.await().ignoreNoExceptions().untilAsserted(() -> {
+            assertEquals(objCache1.getIfCached(key1), Optional.of(value1));
+            assertEquals(objCache2.get(key1).join(), Optional.of(value1));
+            assertEquals(objCache2.getIfCached(key1), Optional.of(value1));
+        });
+
+        MyClass value2 = new MyClass(testName, 2);
+        objCache1.readModifyUpdate(key1, (oldData) -> {return value2;}).join();
+
+        Awaitility.await().ignoreNoExceptions().untilAsserted(() 
->assertEquals(storeObj.get(), value2));
+    }
+
     @Test(dataProvider = "impl")
     public void insertionDeletionWitGenericType(String provider, String url) 
throws Exception {
         @Cleanup

Reply via email to