LogSegmentMetadataStore should only notify when the list of log segments is 
updated

Currently it notifies the listeners not only when there is a change but also 
when session expires. it would break the readahead loop and cause readers have 
to wait until it is able to connect to zookeeper again.

With this change, it would only notify when the list of log segments is 
updated. If it disconnects to zookeeper, the listener won't be notified and it 
would keep reading from the log segments it knows.

RB_ID=842998


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/72a786e7
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/72a786e7
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/72a786e7

Branch: refs/heads/merge/DL-98
Commit: 72a786e78540fb5a3f8e27f9d71d3616d06d1548
Parents: f008f75
Author: Sijie Guo <sij...@twitter.com>
Authored: Mon Jul 11 10:10:55 2016 -0700
Committer: Sijie Guo <sij...@twitter.com>
Committed: Mon Dec 12 16:52:18 2016 -0800

----------------------------------------------------------------------
 .../impl/ZKLogSegmentMetadataStore.java         | 94 ++++++++++++++++----
 .../impl/TestZKLogSegmentMetadataStore.java     | 22 ++---
 2 files changed, 84 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/72a786e7/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index c0796a1..cb53b23 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.impl;
 
+import com.google.common.collect.ImmutableList;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
@@ -45,10 +46,14 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -64,7 +69,9 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
 
     private static final Logger logger = 
LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
 
-    private static class ReadLogSegmentsTask implements Runnable, 
FutureEventListener<List<String>> {
+    private static final List<String> EMPTY_LIST = ImmutableList.of();
+
+    private static class ReadLogSegmentsTask implements Runnable, 
FutureEventListener<Versioned<List<String>>> {
 
         private final String logSegmentsPath;
         private final ZKLogSegmentMetadataStore store;
@@ -78,15 +85,16 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
         }
 
         @Override
-        public void onSuccess(final List<String> segments) {
+        public void onSuccess(final Versioned<List<String>> segments) {
             // reset the back off after a successful operation
             currentZKBackOffMs = store.minZKBackoffMs;
-            final Set<LogSegmentNamesListener> listenerSet = 
store.listeners.get(logSegmentsPath);
+            final Map<LogSegmentNamesListener, 
VersionedLogSegmentNamesListener> listenerSet =
+                    store.listeners.get(logSegmentsPath);
             if (null != listenerSet) {
                 store.submitTask(logSegmentsPath, new Runnable() {
                     @Override
                     public void run() {
-                        for (LogSegmentNamesListener listener : listenerSet) {
+                        for (VersionedLogSegmentNamesListener listener : 
listenerSet.values()) {
                             listener.onSegmentsUpdated(segments);
                         }
                     }
@@ -120,6 +128,48 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
         }
     }
 
+    /**
+     * A log segment names listener that keeps tracking the version of list of 
log segments that it has been notified.
+     * It only notify the newer log segments.
+     */
+    static class VersionedLogSegmentNamesListener {
+
+        private final LogSegmentNamesListener listener;
+        private Versioned<List<String>> lastNotifiedLogSegments;
+
+        VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
+            this.listener = listener;
+            this.lastNotifiedLogSegments = new 
Versioned<List<String>>(EMPTY_LIST, Version.NEW);
+        }
+
+        synchronized void onSegmentsUpdated(Versioned<List<String>> 
logSegments) {
+            if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
+                    
lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == 
Version.Occurred.BEFORE) {
+                lastNotifiedLogSegments = logSegments;
+                listener.onSegmentsUpdated(logSegments.getValue());
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return listener.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof VersionedLogSegmentNamesListener)) {
+                return false;
+            }
+            VersionedLogSegmentNamesListener other = 
(VersionedLogSegmentNamesListener) obj;
+            return listener.equals(other.listener);
+        }
+
+        @Override
+        public String toString() {
+            return listener.toString();
+        }
+    }
+
     final DistributedLogConfiguration conf;
     // settings
     final int minZKBackoffMs;
@@ -128,7 +178,7 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
 
     final ZooKeeperClient zkc;
     // log segment listeners
-    final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners;
+    final ConcurrentMap<String, Map<LogSegmentNamesListener, 
VersionedLogSegmentNamesListener>> listeners;
     // scheduler
     final OrderedScheduler scheduler;
     final ReentrantReadWriteLock closeLock;
@@ -139,7 +189,8 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
                                      OrderedScheduler scheduler) {
         this.conf = conf;
         this.zkc = zkc;
-        this.listeners = new ConcurrentHashMap<String, 
Set<LogSegmentNamesListener>>();
+        this.listeners =
+                new ConcurrentHashMap<String, Map<LogSegmentNamesListener, 
VersionedLogSegmentNamesListener>>();
         this.scheduler = scheduler;
         this.closeLock = new ReentrantReadWriteLock();
         // settings
@@ -275,11 +326,16 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
 
     @Override
     public Future<List<String>> getLogSegmentNames(String logSegmentsPath) {
-        return getLogSegmentNames(logSegmentsPath, null);
+        return getLogSegmentNames(logSegmentsPath, null).map(new 
AbstractFunction1<Versioned<List<String>>, List<String>>() {
+            @Override
+            public List<String> apply(Versioned<List<String>> list) {
+                return list.getValue();
+            }
+        });
     }
 
-    Future<List<String>> getLogSegmentNames(String logSegmentsPath, Watcher 
watcher) {
-        Promise<List<String>> result = new Promise<List<String>>();
+    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, 
Watcher watcher) {
+        Promise<Versioned<List<String>>> result = new 
Promise<Versioned<List<String>>>();
         try {
             zkc.get().getChildren(logSegmentsPath, watcher, this, result);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -293,9 +349,11 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
     @Override
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, List<String> 
children, Stat stat) {
-        Promise<List<String>> result = ((Promise<List<String>>) ctx);
+        Promise<Versioned<List<String>>> result = 
((Promise<Versioned<List<String>>>) ctx);
         if (KeeperException.Code.OK.intValue() == rc) {
-            result.setValue(children);
+            /** cversion: the number of changes to the children of this znode 
**/
+            ZkVersion zkVersion = new ZkVersion(stat.getCversion());
+            result.setValue(new Versioned(children, zkVersion));
         } else {
             
result.setException(KeeperException.create(KeeperException.Code.get(rc)));
         }
@@ -312,10 +370,13 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = 
listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> 
listenerSet =
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
-                Set<LogSegmentNamesListener> newListenerSet = new 
HashSet<LogSegmentNamesListener>();
-                Set<LogSegmentNamesListener> oldListenerSet = 
listeners.putIfAbsent(logSegmentsPath, newListenerSet);
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> 
newListenerSet =
+                        new HashMap<LogSegmentNamesListener, 
VersionedLogSegmentNamesListener>();
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> 
oldListenerSet =
+                        listeners.putIfAbsent(logSegmentsPath, newListenerSet);
                 if (null != oldListenerSet) {
                     listenerSet = oldListenerSet;
                 } else {
@@ -323,7 +384,7 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
                 }
             }
             synchronized (listenerSet) {
-                listenerSet.add(listener);
+                listenerSet.put(listener, new 
VersionedLogSegmentNamesListener(listener));
                 if (!listeners.containsKey(logSegmentsPath)) {
                     // listener set has been removed, add it back
                     listeners.put(logSegmentsPath, listenerSet);
@@ -343,7 +404,8 @@ public class ZKLogSegmentMetadataStore implements 
LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = 
listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> 
listenerSet =
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/72a786e7/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index e4c774b..f8fd3eb 100644
--- 
a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ 
b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -367,7 +367,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -429,7 +429,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -496,7 +496,7 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", 
lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", 
lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -510,16 +510,6 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         ZooKeeperClientUtils.expireSession(zkc,
                 DLUtils.getZKServersFromDLUri(uri), 
conf.getZKSessionTimeoutMilliseconds());
 
-        while (numNotifications.get() < 2) {
-            TimeUnit.MILLISECONDS.sleep(10);
-        }
-        assertEquals("Should receive second segment list update",
-                2, numNotifications.get());
-        List<String> secondSegmentList = segmentLists.get(1);
-        Collections.sort(secondSegmentList);
-        assertEquals("List of segments should be same",
-                children, secondSegmentList);
-
         logger.info("Create another {} segments.", numSegments);
 
         // create another log segment, it should trigger segment list updated
@@ -532,12 +522,12 @@ public class TestZKLogSegmentMetadataStore extends 
TestDistributedLogBase {
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         logger.info("All log segments become {}", newChildren);
-        while (numNotifications.get() < 3) {
+        while (numNotifications.get() < 2) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
         assertEquals("Should receive third segment list update",
-                3, numNotifications.get());
-        List<String> thirdSegmentList = segmentLists.get(2);
+                2, numNotifications.get());
+        List<String> thirdSegmentList = segmentLists.get(1);
         Collections.sort(thirdSegmentList);
         assertEquals("List of segments should be updated",
                 2 * numSegments, thirdSegmentList.size());

Reply via email to