This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 38bba54 Unit test to verify metadata cache consistency across brokers. (#11202) 38bba54 is described below commit 38bba54a167ed3fba419bc2b7da36ce41e2ece21 Author: Bharani Chadalavada <bharanic....@gmail.com> AuthorDate: Mon Jul 26 15:47:53 2021 -0700 Unit test to verify metadata cache consistency across brokers. (#11202) Co-authored-by: Bharani Chadalavada <bchadalav...@splunk.com> --- .../apache/pulsar/metadata/MetadataCacheTest.java | 53 +++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index f2c25c4..68f772c 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -33,13 +33,16 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import lombok.AllArgsConstructor; import lombok.Cleanup; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.CacheGetResult; @@ -51,11 +54,14 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsExcept import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; +import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; import org.awaitility.Awaitility; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j public class MetadataCacheTest extends BaseMetadataStoreTest { @Data @@ -102,7 +108,7 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { } @Test(dataProvider = "zk") - public void crossStoreUpdates(String provider, String url) throws Exception { + public void crossStoreAddDelete(String provider, String url) throws Exception { @Cleanup MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); @@ -165,6 +171,51 @@ public class MetadataCacheTest extends BaseMetadataStoreTest { }); } + @Test(dataProvider = "zk") + public void crossStoreUpdates(String provider, String url) throws Exception { + String testName = "cross store updates"; + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + @Cleanup + MetadataStore store2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + MetadataCacheImpl<MyClass> objCache1 = (MetadataCacheImpl<MyClass>) store1.getMetadataCache(MyClass.class); + + MetadataCacheImpl<MyClass> objCache2 = (MetadataCacheImpl<MyClass>) store2.getMetadataCache(MyClass.class); + AtomicReference<MyClass> storeObj = new AtomicReference<MyClass>(); + store2.registerListener(n -> { + if (n.getType() == NotificationType.Modified) { + CompletableFuture.runAsync(() -> { + try { + MyClass obj = objCache2.get(n.getPath()).get().get(); + storeObj.set(obj); + } catch (Exception e) { + log.error("Got exception {}", e.getMessage()); + } + }); + }; + }); + + String key1 = "/test-key1"; + assertEquals(objCache1.getIfCached(key1), Optional.empty()); + assertEquals(objCache2.getIfCached(key1), Optional.empty()); + + MyClass value1 = new MyClass(testName, 1); + objCache1.create(key1, value1).join(); + + Awaitility.await().ignoreNoExceptions().untilAsserted(() -> { + assertEquals(objCache1.getIfCached(key1), Optional.of(value1)); + assertEquals(objCache2.get(key1).join(), Optional.of(value1)); + assertEquals(objCache2.getIfCached(key1), Optional.of(value1)); + }); + + MyClass value2 = new MyClass(testName, 2); + objCache1.readModifyUpdate(key1, (oldData) -> {return value2;}).join(); + + Awaitility.await().ignoreNoExceptions().untilAsserted(() ->assertEquals(storeObj.get(), value2)); + } + @Test(dataProvider = "impl") public void insertionDeletionWitGenericType(String provider, String url) throws Exception { @Cleanup