merlimat closed pull request #437: Make cache put watch even when node doesn't exist yet URL: https://github.com/apache/incubator-pulsar/pull/437
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index 87283ea3c..40a4eff8c 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -156,10 +156,20 @@ public String create(String path, byte[] data, List<ACL> acl, CreateMode createM tree.put(path, Pair.create(new String(data), 0)); + final Set<Watcher> toNotify = Sets.newHashSet(); + toNotify.addAll(watchers.get(path)); + watchers.removeAll(path); + final String finalPath = path; + executor.execute(() -> toNotify.forEach(watcher -> watcher + .process(new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, finalPath)))); + + + if (!parent.isEmpty()) { final Set<Watcher> toNotifyParent = Sets.newHashSet(); toNotifyParent.addAll(watchers.get(parent)); + executor.execute(() -> { toNotifyParent.forEach(watcher -> watcher.process( new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent))); @@ -491,6 +501,11 @@ public Stat exists(String path, Watcher watcher) throws KeeperException, Interru } public void exists(String path, boolean watch, StatCallback cb, Object ctx) { + exists(path, null, cb, ctx); + } + + @Override + public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) { executor.execute(() -> { mutex.lock(); if (getProgrammedFailStatus()) { @@ -503,6 +518,11 @@ public void exists(String path, boolean watch, StatCallback cb, Object ctx) { return; } + if (watcher != null) { + watchers.put(path, watcher); + } + + if (tree.containsKey(path)) { mutex.unlock(); cb.processResult(0, path, ctx, new Stat()); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java index 24f6ccb69..81b3e2fbd 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java @@ -1238,7 +1238,7 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages); } } - + @Test public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exception { log.info("-- Starting {} test --", methodName); @@ -1690,7 +1690,6 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause() * * @throws Exception */ - @Test public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() throws Exception { log.info("-- Starting {} test --", methodName); @@ -1772,7 +1771,7 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages); } } - + @Test public void testPriorityConsumer() throws Exception { log.info("-- Starting {} test --", methodName); @@ -1814,10 +1813,10 @@ public void testPriorityConsumer() throws Exception { } /** - * a. consumer1 and consumer2 now has more permits (as received and sent more permits) - * b. try to produce more messages: which will again distribute among consumer1 and consumer2 + * a. consumer1 and consumer2 now has more permits (as received and sent more permits) + * b. try to produce more messages: which will again distribute among consumer1 and consumer2 * and should not dispatch to consumer4 - * + * */ for (int i = 0; i < 5; i++) { final String message = "my-message-" + i; @@ -1840,12 +1839,12 @@ public void testPriorityConsumer() throws Exception { * <pre> * Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked * and unblocked consumers. - * - * 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5. + * + * 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5. * Out of which : c1,c2,c4,c5 will be blocked due to MaxUnackedMessages limit. * 2. So, dispatcher should moves round-robin and make sure it delivers unblocked consumer : c3 * </pre> - * + * * @throws Exception */ @Test(timeOut=5000) @@ -2034,5 +2033,5 @@ public void testRedeliveryFailOverConsumer() throws Exception { log.info("-- Exiting {} test --", methodName); } - + } \ No newline at end of file diff --git a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java index 6923ad3b8..acb669025 100644 --- a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java @@ -86,7 +86,8 @@ protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null); - public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, ScheduledExecutorService scheduledExecutor) { + public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, + ScheduledExecutorService scheduledExecutor) { checkNotNull(executor); checkNotNull(scheduledExecutor); this.executor = executor; @@ -285,32 +286,57 @@ public Boolean call() throws Exception { CompletableFuture<Optional<Entry<T, Stat>>> future = new CompletableFuture<>(); dataCache.get(path, (p, executor) -> { // Return a future for the z-node to be fetched from ZK + final CompletableFuture<Boolean> existsFuture = new CompletableFuture<>(); CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>(); // Broker doesn't restart on global-zk session lost: so handling unexpected exception try { - this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> { - Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; + // Check for existence first, then get data: this allows us to put the watch on unconditionally. + zkSession.get().exists(path, watcher, (rc, path1, ctx, stat) -> { + final Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; if (rc == Code.OK.intValue()) { - try { - T obj = deserializer.deserialize(path, content); - // avoid using the zk-client thread to process the result - exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat))); - } catch (Exception e) { - exec.execute(() -> zkFuture.completeExceptionally(e)); - } + exec.execute(() -> existsFuture.complete(true)); } else if (rc == Code.NONODE.intValue()) { - // Return null values for missing z-nodes, as this is not "exceptional" condition - exec.execute(() -> zkFuture.complete(null)); + exec.execute(() -> existsFuture.complete(false)); } else { - exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); + exec.execute(() -> existsFuture.completeExceptionally(KeeperException.create(rc))); } - }, null); + }, null); } catch (Exception e) { LOG.warn("Failed to access zkSession for {} {}", path, e.getMessage(), e); - zkFuture.completeExceptionally(e); + existsFuture.completeExceptionally(e); } - + existsFuture.thenAccept(existed -> { + if (existed) { + try { + this.zkSession.get().getData(path, null, (rc, path1, ctx, content, stat) -> { + Executor exec = scheduledExecutor != null ? scheduledExecutor : executor; + if (rc == Code.OK.intValue()) { + try { + T obj = deserializer.deserialize(path, content); + // avoid using the zk-client thread to process the result + exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat))); + } catch (Exception e) { + exec.execute(() -> zkFuture.completeExceptionally(e)); + } + } else if (rc == Code.NONODE.intValue()) { + // Return null values for missing z-nodes, as this is not "exceptional" condition + exec.execute(() -> zkFuture.complete(null)); + } else { + exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); + } + }, null); + } catch (Exception e) { + LOG.warn("Failed to access zkSession for {} {}", path, e.getMessage(), e); + zkFuture.completeExceptionally(e); + } + } else { + zkFuture.complete(null); + } + }).exceptionally(e -> { + zkFuture.completeExceptionally(e); + return null; + }); return zkFuture; }).thenAccept(result -> { if (result != null) { @@ -394,7 +420,7 @@ public void invalidateRoot(String root) { } } } - + public void stop() { if (shouldShutdownExecutor) { this.executor.shutdown(); diff --git a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java index 71e7cd309..d6684e9be 100644 --- a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java @@ -16,8 +16,8 @@ package com.yahoo.pulsar.zookeeper; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; @@ -31,16 +31,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.mledger.util.Pair; import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -113,7 +113,7 @@ public String deserialize(String key, byte[] content) throws Exception { void testChildrenCache() throws Exception { OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - + zkClient.create("/test", new byte[0], null, null); ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); @@ -170,7 +170,7 @@ void testChildrenCache() throws Exception { void testExistsCache() throws Exception { OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - + // Check existence after creation of the node zkClient.create("/test", new byte[0], null, null); Thread.sleep(20); @@ -191,7 +191,7 @@ void testExistsCache() throws Exception { void testInvalidateCache() throws Exception { OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - + zkClient.create("/test", new byte[0], null, null); zkClient.create("/test/c1", new byte[0], null, null); zkClient.create("/test/c2", new byte[0], null, null); @@ -329,7 +329,7 @@ public String deserialize(String key, byte[] content) throws Exception { // Update shouldn't happen after the last check assertEquals(notificationCount.get(), 1); } - + /** * Verifies that blocking call on zkCache-callback will not introduce deadlock because zkCache completes * future-result with different thread than zookeeper-client thread. @@ -376,7 +376,7 @@ public String deserialize(String key, byte[] content) throws Exception { zkExecutor.shutdown(); scheduledExecutor.shutdown(); } - + /** * <pre> * Verifies that if {@link ZooKeeperCache} fails to fetch data into the cache then @@ -449,4 +449,33 @@ public String deserialize(String key, byte[] content) throws Exception { scheduledExecutor.shutdown(); } + + /** + * Test to ensure that the cache puts on watch even on nodes that do not yet exist. + */ + @Test + public void testExistsWatch() throws Exception { + ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); + OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test"); + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor); + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor, scheduledExecutor); + AtomicInteger counter = new AtomicInteger(0); + ZooKeeperCacheListener<Integer> listener = (path, data, stat) -> counter.incrementAndGet(); + ZooKeeperDataCache<Integer> dataCache = new ZooKeeperDataCache<Integer>(zkCacheService) { + @Override + public Integer deserialize(String key, byte[] content) throws Exception { + return 0; + } + }; + dataCache.registerListener(listener); + assert (!dataCache.get("/existsWatchTest").isPresent()); + zkClient.create("/existsWatchTest", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // Give time for watch to fire. + Thread.sleep(100); + assert (counter.get() == 1); + zkExecutor.shutdown(); + executor.shutdown(); + scheduledExecutor.shutdown(); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services