merlimat closed pull request #437: Make cache put watch even when node doesn't 
exist yet
URL: https://github.com/apache/incubator-pulsar/pull/437
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java 
b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index 87283ea3c..40a4eff8c 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -156,10 +156,20 @@ public String create(String path, byte[] data, List<ACL> 
acl, CreateMode createM
 
             tree.put(path, Pair.create(new String(data), 0));
 
+            final Set<Watcher> toNotify = Sets.newHashSet();
+            toNotify.addAll(watchers.get(path));
+            watchers.removeAll(path);
+            final String finalPath = path;
+            executor.execute(() -> toNotify.forEach(watcher -> watcher
+                    .process(new WatchedEvent(EventType.NodeCreated, 
KeeperState.SyncConnected, finalPath))));
+
+
+
             if (!parent.isEmpty()) {
                 final Set<Watcher> toNotifyParent = Sets.newHashSet();
                 toNotifyParent.addAll(watchers.get(parent));
 
+
                 executor.execute(() -> {
                     toNotifyParent.forEach(watcher -> watcher.process(
                             new WatchedEvent(EventType.NodeChildrenChanged, 
KeeperState.SyncConnected, parent)));
@@ -491,6 +501,11 @@ public Stat exists(String path, Watcher watcher) throws 
KeeperException, Interru
     }
 
     public void exists(String path, boolean watch, StatCallback cb, Object 
ctx) {
+        exists(path, null, cb, ctx);
+    }
+
+    @Override
+    public void exists(String path, Watcher watcher, StatCallback cb, Object 
ctx) {
         executor.execute(() -> {
             mutex.lock();
             if (getProgrammedFailStatus()) {
@@ -503,6 +518,11 @@ public void exists(String path, boolean watch, 
StatCallback cb, Object ctx) {
                 return;
             }
 
+            if (watcher != null) {
+                watchers.put(path, watcher);
+            }
+
+
             if (tree.containsKey(path)) {
                 mutex.unlock();
                 cb.processResult(0, path, ctx, new Stat());
diff --git 
a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
index 24f6ccb69..81b3e2fbd 100644
--- 
a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -1238,7 +1238,7 @@ public void 
testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
             
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
         }
     }
-    
+
     @Test
     public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws 
Exception {
         log.info("-- Starting {} test --", methodName);
@@ -1690,7 +1690,6 @@ public void 
testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause()
      *
      * @throws Exception
      */
-    @Test
     public void 
testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce() 
throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -1772,7 +1771,7 @@ public void 
testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
             
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
         }
     }
-    
+
     @Test
     public void testPriorityConsumer() throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -1814,10 +1813,10 @@ public void testPriorityConsumer() throws Exception {
         }
 
         /**
-         * a. consumer1 and consumer2 now has more permits (as received and 
sent more permits) 
-         * b. try to produce more messages: which will again distribute among 
consumer1 and consumer2 
+         * a. consumer1 and consumer2 now has more permits (as received and 
sent more permits)
+         * b. try to produce more messages: which will again distribute among 
consumer1 and consumer2
          * and should not dispatch to consumer4
-         * 
+         *
          */
         for (int i = 0; i < 5; i++) {
             final String message = "my-message-" + i;
@@ -1840,12 +1839,12 @@ public void testPriorityConsumer() throws Exception {
      * <pre>
      * Verifies Dispatcher dispatches messages properly with 
shared-subscription consumers with combination of blocked
      * and unblocked consumers.
-     * 
-     * 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5. 
+     *
+     * 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5.
      *      Out of which : c1,c2,c4,c5 will be blocked due to 
MaxUnackedMessages limit.
      * 2. So, dispatcher should moves round-robin and make sure it delivers 
unblocked consumer : c3
      * </pre>
-     * 
+     *
      * @throws Exception
      */
     @Test(timeOut=5000)
@@ -2034,5 +2033,5 @@ public void testRedeliveryFailOverConsumer() throws 
Exception {
         log.info("-- Exiting {} test --", methodName);
 
     }
-    
+
 }
\ No newline at end of file
diff --git 
a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java
index 6923ad3b8..acb669025 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/com/yahoo/pulsar/zookeeper/ZooKeeperCache.java
@@ -86,7 +86,8 @@
 
     protected AtomicReference<ZooKeeper> zkSession = new 
AtomicReference<ZooKeeper>(null);
 
-    public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, 
ScheduledExecutorService scheduledExecutor) {
+    public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor,
+            ScheduledExecutorService scheduledExecutor) {
         checkNotNull(executor);
         checkNotNull(scheduledExecutor);
         this.executor = executor;
@@ -285,32 +286,57 @@ public Boolean call() throws Exception {
         CompletableFuture<Optional<Entry<T, Stat>>> future = new 
CompletableFuture<>();
         dataCache.get(path, (p, executor) -> {
             // Return a future for the z-node to be fetched from ZK
+            final CompletableFuture<Boolean> existsFuture = new 
CompletableFuture<>();
             CompletableFuture<Entry<Object, Stat>> zkFuture = new 
CompletableFuture<>();
 
             // Broker doesn't restart on global-zk session lost: so handling 
unexpected exception
             try {
-                this.zkSession.get().getData(path, watcher, (rc, path1, ctx, 
content, stat) -> {
-                    Executor exec = scheduledExecutor != null ? 
scheduledExecutor : executor;
+                // Check for existence first, then get data: this allows us to 
put the watch on unconditionally.
+                zkSession.get().exists(path, watcher, (rc, path1, ctx, stat) 
-> {
+                    final Executor exec = scheduledExecutor != null ? 
scheduledExecutor : executor;
                     if (rc == Code.OK.intValue()) {
-                        try {
-                            T obj = deserializer.deserialize(path, content);
-                            // avoid using the zk-client thread to process the 
result
-                            exec.execute(() -> zkFuture.complete(new 
SimpleImmutableEntry<Object, Stat>(obj, stat)));
-                        } catch (Exception e) {
-                            exec.execute(() -> 
zkFuture.completeExceptionally(e));
-                        }
+                        exec.execute(() -> existsFuture.complete(true));
                     } else if (rc == Code.NONODE.intValue()) {
-                        // Return null values for missing z-nodes, as this is 
not "exceptional" condition
-                        exec.execute(() -> zkFuture.complete(null));
+                        exec.execute(() -> existsFuture.complete(false));
                     } else {
-                        exec.execute(() -> 
zkFuture.completeExceptionally(KeeperException.create(rc)));
+                        exec.execute(() -> 
existsFuture.completeExceptionally(KeeperException.create(rc)));
                     }
-                }, null);                
+                }, null);
             } catch (Exception e) {
                 LOG.warn("Failed to access zkSession for {} {}", path, 
e.getMessage(), e);
-                zkFuture.completeExceptionally(e);
+                existsFuture.completeExceptionally(e);
             }
-            
+            existsFuture.thenAccept(existed -> {
+                if (existed) {
+                    try {
+                        this.zkSession.get().getData(path, null, (rc, path1, 
ctx, content, stat) -> {
+                            Executor exec = scheduledExecutor != null ? 
scheduledExecutor : executor;
+                            if (rc == Code.OK.intValue()) {
+                                try {
+                                    T obj = deserializer.deserialize(path, 
content);
+                                    // avoid using the zk-client thread to 
process the result
+                                    exec.execute(() -> zkFuture.complete(new 
SimpleImmutableEntry<Object, Stat>(obj, stat)));
+                                } catch (Exception e) {
+                                    exec.execute(() -> 
zkFuture.completeExceptionally(e));
+                                }
+                            } else if (rc == Code.NONODE.intValue()) {
+                                // Return null values for missing z-nodes, as 
this is not "exceptional" condition
+                                exec.execute(() -> zkFuture.complete(null));
+                            } else {
+                                exec.execute(() -> 
zkFuture.completeExceptionally(KeeperException.create(rc)));
+                            }
+                        }, null);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to access zkSession for {} {}", path, 
e.getMessage(), e);
+                        zkFuture.completeExceptionally(e);
+                    }
+                } else {
+                    zkFuture.complete(null);
+                }
+            }).exceptionally(e -> {
+                zkFuture.completeExceptionally(e);
+                return null;
+            });
             return zkFuture;
         }).thenAccept(result -> {
             if (result != null) {
@@ -394,7 +420,7 @@ public void invalidateRoot(String root) {
             }
         }
     }
-    
+
     public void stop() {
         if (shouldShutdownExecutor) {
             this.executor.shutdown();
diff --git 
a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
index 71e7cd309..d6684e9be 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/com/yahoo/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -16,8 +16,8 @@
 package com.yahoo.pulsar.zookeeper;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
@@ -31,16 +31,16 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.mledger.util.Pair;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -113,7 +113,7 @@ public String deserialize(String key, byte[] content) 
throws Exception {
     void testChildrenCache() throws Exception {
         OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
         ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
-        
+
         zkClient.create("/test", new byte[0], null, null);
 
         ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
@@ -170,7 +170,7 @@ void testChildrenCache() throws Exception {
     void testExistsCache() throws Exception {
         OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
         ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
-        
+
         // Check existence after creation of the node
         zkClient.create("/test", new byte[0], null, null);
         Thread.sleep(20);
@@ -191,7 +191,7 @@ void testExistsCache() throws Exception {
     void testInvalidateCache() throws Exception {
         OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
         ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
-        
+
         zkClient.create("/test", new byte[0], null, null);
         zkClient.create("/test/c1", new byte[0], null, null);
         zkClient.create("/test/c2", new byte[0], null, null);
@@ -329,7 +329,7 @@ public String deserialize(String key, byte[] content) 
throws Exception {
         // Update shouldn't happen after the last check
         assertEquals(notificationCount.get(), 1);
     }
-    
+
     /**
      * Verifies that blocking call on zkCache-callback will not introduce 
deadlock because zkCache completes
      * future-result with different thread than zookeeper-client thread.
@@ -376,7 +376,7 @@ public String deserialize(String key, byte[] content) 
throws Exception {
         zkExecutor.shutdown();
         scheduledExecutor.shutdown();
     }
-    
+
     /**
      * <pre>
      * Verifies that if {@link ZooKeeperCache} fails to fetch data into the 
cache then 
@@ -449,4 +449,33 @@ public String deserialize(String key, byte[] content) 
throws Exception {
         scheduledExecutor.shutdown();
 
     }
+
+    /**
+     * Test to ensure that the cache puts on watch even on nodes that do not 
yet exist.
+     */
+    @Test
+    public void testExistsWatch() throws Exception {
+        ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("mockZk"));
+        OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
+        ScheduledExecutorService scheduledExecutor = 
Executors.newSingleThreadScheduledExecutor();
+        MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor, scheduledExecutor);
+        AtomicInteger counter = new AtomicInteger(0);
+        ZooKeeperCacheListener<Integer> listener = (path, data, stat) -> 
counter.incrementAndGet();
+        ZooKeeperDataCache<Integer> dataCache = new 
ZooKeeperDataCache<Integer>(zkCacheService) {
+            @Override
+            public Integer deserialize(String key, byte[] content) throws 
Exception {
+                return 0;
+            }
+        };
+        dataCache.registerListener(listener);
+        assert (!dataCache.get("/existsWatchTest").isPresent());
+        zkClient.create("/existsWatchTest", new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        // Give time for watch to fire.
+        Thread.sleep(100);
+        assert (counter.get() == 1);
+        zkExecutor.shutdown();
+        executor.shutdown();
+        scheduledExecutor.shutdown();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to