[
https://issues.apache.org/jira/browse/CURATOR-653?focusedWorklogId=814686&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814686
]
ASF GitHub Bot logged work on CURATOR-653:
------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Oct/22 13:50
Start Date: 07/Oct/22 13:50
Worklog Time Spent: 10m
Work Description: XComp commented on code in PR #398:
URL: https://github.com/apache/curator/pull/398#discussion_r990006956
##########
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java:
##########
@@ -540,10 +540,17 @@ public String getLastPathIsLeader()
@VisibleForTesting
volatile CountDownLatch debugResetWaitLatch = null;
+ @VisibleForTesting
+ volatile CountDownLatch debugRestWaitBeforeNodeDelete = null;
Review Comment:
```suggestion
volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;
```
There's a typo in the name. Additionally, we might want to add `Latch` at
the end to reflect the purpose of this member analogously to the other latches.
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ latches.add(latch);
+ assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ if (i == 0) {
+
assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
Review Comment:
```suggestion
assertEquals("The first LeaderLatch instance should
acquire leadership.", states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
```
nit: maybe adding a bit more context to this polling here to describe the
test case
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
Review Comment:
Using the `LeaderLatch` ID in the event labels (as mentioned above) might
help when evaluating the queue later on in the test. But to be fair: doing the
asserts on `hasLeadership` like it's already done below (lines 303-304) serves
the same purpose. I just mention it as another idea here. :shrug:
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ latches.add(latch);
+ assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ if (i == 0) {
+
assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
+ }
+ }
+ catch (Exception e){
+ return;
+ }
+ }
+ timing.forWaiting().sleepABit();
+ // now latch1 is leader, latch2 is not leader. latch2 listens to the
ephemeral node created by latch1
+ LeaderLatch latch1 = latches.get(0);
+ LeaderLatch latch2 = latches.get(1);
+ assertTrue(latch1.hasLeadership());
+ assertFalse(latch2.hasLeadership());
+ try {
+ latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1);
+ latch2.debugResetWaitLatch = new CountDownLatch(1);
+ latch1.debugResetWaitLatch = new CountDownLatch(1);
+
+ // force latch1 and latch2 reset
+ latch1.reset();
+ ForkJoinPool.commonPool().submit(() -> {
Review Comment:
Should we add a comment here on why we're calling `latch2.reset()` in a
separate thread? AFAIU, it's done to not make the test's thread block due to
`latch2.debugRestWaitBeforeNodeDelete`. It might help readers if this is
reflected in a comment here. WDYT?
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ latches.add(latch);
+ assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ if (i == 0) {
+
assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
+ }
+ }
+ catch (Exception e){
+ return;
+ }
+ }
+ timing.forWaiting().sleepABit();
+ // now latch1 is leader, latch2 is not leader. latch2 listens to the
ephemeral node created by latch1
+ LeaderLatch latch1 = latches.get(0);
+ LeaderLatch latch2 = latches.get(1);
Review Comment:
```suggestion
LeaderLatch initialLeaderLatch = latches.get(0);
LeaderLatch initialNonLeaderLatch = latches.get(1);
```
nit: maybe, making the variable more descriptive to avoid confusion.
Especially because we're switching the order here in comparison to what is
described in the PR description and the corresponding ticket.
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ latches.add(latch);
+ assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ if (i == 0) {
+
assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
+ }
+ }
+ catch (Exception e){
+ return;
+ }
+ }
+ timing.forWaiting().sleepABit();
Review Comment:
What's the purpose of waiting here? :thinking:
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ latches.add(latch);
+ assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ if (i == 0) {
+
assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
+ }
+ }
+ catch (Exception e){
+ return;
+ }
+ }
+ timing.forWaiting().sleepABit();
+ // now latch1 is leader, latch2 is not leader. latch2 listens to the
ephemeral node created by latch1
Review Comment:
nit: Moving comments into assert messages improves the test output and still
works as some kind of comment. This comment could be added to the `assertTrue`
and `assertFalse` in line 284-285 below describing the currently expected state.
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
+ }
+ }
+ };
+
client.getConnectionStateListenable().addListener(stateListener);
+ client.start();
+ clients.add(client);
+ LeaderLatch latch = new LeaderLatch(client, latchPath,
String.valueOf(i));
+ LeaderLatchListener listener = new LeaderLatchListener() {
+ @Override
+ public void isLeader() {
+ states.add("true");
+ }
+
+ @Override
+ public void notLeader() {
+ states.add("false");
+ }
+ };
+ latch.addListener(listener);
+ latch.start();
+ latches.add(latch);
+ assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+ if (i == 0) {
+
assertEquals(states.poll(timing.forWaiting().milliseconds(),
TimeUnit.MILLISECONDS), "true");
+ }
+ }
+ catch (Exception e){
Review Comment:
Why are we hiding thrown exceptions here? Shouldn't we expose it as part of
the test run if something went wrong? :thinking: The test would succeed if the
Exception is thrown in this block and caught here.
##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws
Exception
}
}
+ @Test
+ public void testCheckLeaderShipTiming() throws Exception
+ {
+ final String latchPath = "/test";
+ Timing timing = new Timing();
+ List<LeaderLatch> latches = Lists.newArrayList();
+ List<CuratorFramework> clients = Lists.newArrayList();
+ final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ for ( int i = 0; i < 2; ++i ) {
+ try {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .connectionTimeoutMs(10000)
+ .sessionTimeoutMs(60000)
+ .retryPolicy(new RetryOneTime(1))
+ .connectionStateErrorPolicy(new
StandardConnectionStateErrorPolicy())
+ .build();
+ ConnectionStateListener stateListener = new
ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client,
ConnectionState newState)
+ {
+ if (newState == ConnectionState.CONNECTED) {
+ states.add(newState.name());
Review Comment:
nit: Adding the `LeaderLatch` ID (we could use for loop `i`) here might help
further down in the test understanding the state of the queue.
Issue Time Tracking
-------------------
Worklog Id: (was: 814686)
Time Spent: 40m (was: 0.5h)
> Double leader for LeaderLatch
> -----------------------------
>
> Key: CURATOR-653
> URL: https://issues.apache.org/jira/browse/CURATOR-653
> Project: Apache Curator
> Issue Type: Task
> Components: Recipes
> Reporter: Zili Chen
> Assignee: Zili Chen
> Priority: Major
> Fix For: 5.4.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Reported by @woaishixiaoxiao:
> When I use the LeaderLatch to select leader, there is a double-leader
> phenomenon.
> The timeline is as follows:
> 1. The zk cluster switch leader node bescause of zxid overflow. The cluster
> is unavailable to the outside world
> 2. A client(not leader befor zxid overflow) and B client(is leader before
> zxid overflow) enter the suspend state, B client set its leader status to
> false
> 3. The zk cluster complete the leader node election and the cluster back to
> normal
> 4. A client enter the reconnect state and call the reset function, set its
> leader status to false.
> 5. B client enter the reconnect state, call the reset function. set its
> leader status to false. Delete its old path.
> 6. A client receive preNodeDeleteEvent. Then getChildren from zkServer.
> Find itself is the smallest number and set itself as a leader.
> 7. B client create a new temporary node and then getChildren from zkServer.
> Find itself not the node with the smallest serial number and listen to the
> previous node delete event.
> 8. A client delete its old path.
> 9. B client receive the preNodeDeleteEvent. then getchildren from zkServer.
> Find itself is the smallest sequence number and then set itself as a leader
> 10. A client create a new temporary node and then getChildren from
> zkServer. Find itself not the node with the smallest serial number and
> listen to the previous node delete event. but it doesn't set itself as a
> non-leader state. because of the sixth step operation, A still is leader
> state now.
> 11. now A client and B client are the leader at the same time
--
This message was sent by Atlassian Jira
(v8.20.10#820010)