[ https://issues.apache.org/jira/browse/CURATOR-653?focusedWorklogId=814686&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814686 ]
ASF GitHub Bot logged work on CURATOR-653: ------------------------------------------ Author: ASF GitHub Bot Created on: 07/Oct/22 13:50 Start Date: 07/Oct/22 13:50 Worklog Time Spent: 10m Work Description: XComp commented on code in PR #398: URL: https://github.com/apache/curator/pull/398#discussion_r990006956 ########## curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java: ########## @@ -540,10 +540,17 @@ public String getLastPathIsLeader() @VisibleForTesting volatile CountDownLatch debugResetWaitLatch = null; + @VisibleForTesting + volatile CountDownLatch debugRestWaitBeforeNodeDelete = null; Review Comment: ```suggestion volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null; ``` There's a typo in the name. Additionally, we might want to add `Latch` at the end to reflect the purpose of this member analogously to the other latches. ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); Review Comment: ```suggestion assertEquals("The first LeaderLatch instance should acquire leadership.", states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); ``` nit: maybe adding a bit more context to this polling here to describe the test case ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; Review Comment: Using the `LeaderLatch` ID in the event labels (as mentioned above) might help when evaluating the queue later on in the test. But to be fair: doing the asserts on `hasLeadership` like it's already done below (lines 303-304) serves the same purpose. I just mention it as another idea here. :shrug: ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 + LeaderLatch latch1 = latches.get(0); + LeaderLatch latch2 = latches.get(1); + assertTrue(latch1.hasLeadership()); + assertFalse(latch2.hasLeadership()); + try { + latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1); + latch2.debugResetWaitLatch = new CountDownLatch(1); + latch1.debugResetWaitLatch = new CountDownLatch(1); + + // force latch1 and latch2 reset + latch1.reset(); + ForkJoinPool.commonPool().submit(() -> { Review Comment: Should we add a comment here on why we're calling `latch2.reset()` in a separate thread? AFAIU, it's done to not make the test's thread block due to `latch2.debugRestWaitBeforeNodeDelete`. It might help readers if this is reflected in a comment here. WDYT? ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 + LeaderLatch latch1 = latches.get(0); + LeaderLatch latch2 = latches.get(1); Review Comment: ```suggestion LeaderLatch initialLeaderLatch = latches.get(0); LeaderLatch initialNonLeaderLatch = latches.get(1); ``` nit: maybe, making the variable more descriptive to avoid confusion. Especially because we're switching the order here in comparison to what is described in the PR description and the corresponding ticket. ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); Review Comment: What's the purpose of waiting here? :thinking: ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ + return; + } + } + timing.forWaiting().sleepABit(); + // now latch1 is leader, latch2 is not leader. latch2 listens to the ephemeral node created by latch1 Review Comment: nit: Moving comments into assert messages improves the test output and still works as some kind of comment. This comment could be added to the `assertTrue` and `assertFalse` in line 284-285 below describing the currently expected state. ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); + } + } + }; + client.getConnectionStateListenable().addListener(stateListener); + client.start(); + clients.add(client); + LeaderLatch latch = new LeaderLatch(client, latchPath, String.valueOf(i)); + LeaderLatchListener listener = new LeaderLatchListener() { + @Override + public void isLeader() { + states.add("true"); + } + + @Override + public void notLeader() { + states.add("false"); + } + }; + latch.addListener(listener); + latch.start(); + latches.add(latch); + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name()); + if (i == 0) { + assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true"); + } + } + catch (Exception e){ Review Comment: Why are we hiding thrown exceptions here? Shouldn't we expose it as part of the test run if something went wrong? :thinking: The test would succeed if the Exception is thrown in this block and caught here. ########## curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java: ########## @@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception } } + @Test + public void testCheckLeaderShipTiming() throws Exception + { + final String latchPath = "/test"; + Timing timing = new Timing(); + List<LeaderLatch> latches = Lists.newArrayList(); + List<CuratorFramework> clients = Lists.newArrayList(); + final BlockingQueue<String> states = Queues.newLinkedBlockingQueue(); + ExecutorService executorService = Executors.newFixedThreadPool(2); + for ( int i = 0; i < 2; ++i ) { + try { + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(10000) + .sessionTimeoutMs(60000) + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new StandardConnectionStateErrorPolicy()) + .build(); + ConnectionStateListener stateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if (newState == ConnectionState.CONNECTED) { + states.add(newState.name()); Review Comment: nit: Adding the `LeaderLatch` ID (we could use for loop `i`) here might help further down in the test understanding the state of the queue. Issue Time Tracking ------------------- Worklog Id: (was: 814686) Time Spent: 40m (was: 0.5h) > Double leader for LeaderLatch > ----------------------------- > > Key: CURATOR-653 > URL: https://issues.apache.org/jira/browse/CURATOR-653 > Project: Apache Curator > Issue Type: Task > Components: Recipes > Reporter: Zili Chen > Assignee: Zili Chen > Priority: Major > Fix For: 5.4.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Reported by @woaishixiaoxiao: > When I use the LeaderLatch to select leader, there is a double-leader > phenomenon. > The timeline is as follows: > 1. The zk cluster switch leader node bescause of zxid overflow. The cluster > is unavailable to the outside world > 2. A client(not leader befor zxid overflow) and B client(is leader before > zxid overflow) enter the suspend state, B client set its leader status to > false > 3. The zk cluster complete the leader node election and the cluster back to > normal > 4. A client enter the reconnect state and call the reset function, set its > leader status to false. > 5. B client enter the reconnect state, call the reset function. set its > leader status to false. Delete its old path. > 6. A client receive preNodeDeleteEvent. Then getChildren from zkServer. > Find itself is the smallest number and set itself as a leader. > 7. B client create a new temporary node and then getChildren from zkServer. > Find itself not the node with the smallest serial number and listen to the > previous node delete event. > 8. A client delete its old path. > 9. B client receive the preNodeDeleteEvent. then getchildren from zkServer. > Find itself is the smallest sequence number and then set itself as a leader > 10. A client create a new temporary node and then getChildren from > zkServer. Find itself not the node with the smallest serial number and > listen to the previous node delete event. but it doesn't set itself as a > non-leader state. because of the sixth step operation, A still is leader > state now. > 11. now A client and B client are the leader at the same time -- This message was sent by Atlassian Jira (v8.20.10#820010)