[ 
https://issues.apache.org/jira/browse/CURATOR-653?focusedWorklogId=814686&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814686
 ]

ASF GitHub Bot logged work on CURATOR-653:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Oct/22 13:50
            Start Date: 07/Oct/22 13:50
    Worklog Time Spent: 10m 
      Work Description: XComp commented on code in PR #398:
URL: https://github.com/apache/curator/pull/398#discussion_r990006956


##########
curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java:
##########
@@ -540,10 +540,17 @@ public String getLastPathIsLeader()
     @VisibleForTesting
     volatile CountDownLatch debugResetWaitLatch = null;
 
+    @VisibleForTesting
+    volatile CountDownLatch debugRestWaitBeforeNodeDelete = null;

Review Comment:
   ```suggestion
       volatile CountDownLatch debugResetWaitBeforeNodeDeleteLatch = null;
   ```
   There's a typo in the name. Additionally, we might want to add `Latch` at 
the end to reflect the purpose of this member analogously to the other latches.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    
assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");

Review Comment:
   ```suggestion
                       assertEquals("The first LeaderLatch instance should 
acquire leadership.", states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
   ```
   nit: maybe adding a bit more context to this polling here to describe the 
test case



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };

Review Comment:
   Using the `LeaderLatch` ID in the event labels (as mentioned above) might 
help when evaluating the queue later on in the test. But to be fair: doing the 
asserts on `hasLeadership` like it's already done below (lines 303-304) serves 
the same purpose. I just mention it as another idea here. :shrug: 



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    
assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();
+        // now latch1 is leader, latch2 is not leader. latch2 listens to the 
ephemeral node created by latch1
+        LeaderLatch latch1 = latches.get(0);
+        LeaderLatch latch2 = latches.get(1);
+        assertTrue(latch1.hasLeadership());
+        assertFalse(latch2.hasLeadership());
+        try {
+            latch2.debugRestWaitBeforeNodeDelete = new CountDownLatch(1);
+            latch2.debugResetWaitLatch = new CountDownLatch(1);
+            latch1.debugResetWaitLatch = new CountDownLatch(1);
+
+            // force latch1 and latch2 reset
+            latch1.reset();
+            ForkJoinPool.commonPool().submit(() -> {

Review Comment:
   Should we add a comment here on why we're calling `latch2.reset()` in a 
separate thread? AFAIU, it's done to not make the test's thread block due to 
`latch2.debugRestWaitBeforeNodeDelete`. It might help readers if this is 
reflected in a comment here. WDYT?



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    
assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();
+        // now latch1 is leader, latch2 is not leader. latch2 listens to the 
ephemeral node created by latch1
+        LeaderLatch latch1 = latches.get(0);
+        LeaderLatch latch2 = latches.get(1);

Review Comment:
   ```suggestion
           LeaderLatch initialLeaderLatch = latches.get(0);
           LeaderLatch initialNonLeaderLatch = latches.get(1);
   ```
   nit: maybe, making the variable more descriptive to avoid confusion. 
Especially because we're switching the order here in comparison to what is 
described in the PR description and the corresponding ticket.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    
assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();

Review Comment:
   What's the purpose of waiting here? :thinking: 



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    
assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){
+                return;
+            }
+        }
+        timing.forWaiting().sleepABit();
+        // now latch1 is leader, latch2 is not leader. latch2 listens to the 
ephemeral node created by latch1

Review Comment:
   nit: Moving comments into assert messages improves the test output and still 
works as some kind of comment. This comment could be added to the `assertTrue` 
and `assertFalse` in line 284-285 below describing the currently expected state.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());
+                        }
+                    }
+                };
+                
client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+                clients.add(client);
+                LeaderLatch latch = new LeaderLatch(client, latchPath, 
String.valueOf(i));
+                LeaderLatchListener listener = new LeaderLatchListener() {
+                    @Override
+                    public void isLeader() {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader() {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                latches.add(latch);
+                assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                if (i == 0) {
+                    
assertEquals(states.poll(timing.forWaiting().milliseconds(), 
TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            catch (Exception e){

Review Comment:
   Why are we hiding thrown exceptions here? Shouldn't we expose it as part of 
the test run if something went wrong? :thinking: The test would succeed if the 
Exception is thrown in this block and caught here.



##########
curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java:
##########
@@ -220,6 +223,108 @@ public void testWatchedNodeDeletedOnReconnect() throws 
Exception
         }
     }
 
+    @Test
+    public void testCheckLeaderShipTiming() throws Exception
+    {
+        final String latchPath = "/test";
+        Timing timing = new Timing();
+        List<LeaderLatch> latches = Lists.newArrayList();
+        List<CuratorFramework> clients = Lists.newArrayList();
+        final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        for ( int i = 0; i < 2; ++i ) {
+            try {
+                CuratorFramework client = CuratorFrameworkFactory.builder()
+                        .connectString(server.getConnectString())
+                        .connectionTimeoutMs(10000)
+                        .sessionTimeoutMs(60000)
+                        .retryPolicy(new RetryOneTime(1))
+                        .connectionStateErrorPolicy(new 
StandardConnectionStateErrorPolicy())
+                        .build();
+                ConnectionStateListener stateListener = new 
ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, 
ConnectionState newState)
+                    {
+                        if (newState == ConnectionState.CONNECTED) {
+                            states.add(newState.name());

Review Comment:
   nit: Adding the `LeaderLatch` ID (we could use for loop `i`) here might help 
further down in the test understanding the state of the queue.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 814686)
    Time Spent: 40m  (was: 0.5h)

> Double leader for LeaderLatch
> -----------------------------
>
>                 Key: CURATOR-653
>                 URL: https://issues.apache.org/jira/browse/CURATOR-653
>             Project: Apache Curator
>          Issue Type: Task
>          Components: Recipes
>            Reporter: Zili Chen
>            Assignee: Zili Chen
>            Priority: Major
>             Fix For: 5.4.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Reported by @woaishixiaoxiao:
> When I use the LeaderLatch to select leader,  there is a double-leader 
> phenomenon.
> The timeline is as follows:
> 1. The zk cluster switch leader node bescause of zxid overflow. The cluster 
> is unavailable to the outside world
> 2. A client(not leader befor zxid overflow) and B client(is leader before 
> zxid overflow) enter the suspend state, B client set  its leader status to 
> false
> 3. The zk cluster complete the leader node election and the cluster back to 
> normal
> 4. A client enter the reconnect state  and  call the reset function, set its 
> leader status to false. 
> 5. B client enter the reconnect state, call the reset function. set its 
> leader status to false.  Delete its old path.
> 6. A client receive preNodeDeleteEvent.  Then getChildren from zkServer.  
> Find itself is the smallest number and set itself as a leader.
> 7. B client create a new temporary node  and then getChildren from zkServer.  
> Find itself not the node with the smallest serial number and listen to the 
> previous node delete event.
> 8. A client delete its old path.
> 9. B client receive the preNodeDeleteEvent. then getchildren from zkServer. 
> Find itself is the smallest sequence number and then set itself as a leader
> 10. A client create  a new temporary node  and then getChildren from 
> zkServer.  Find itself not the node with the smallest serial number and 
> listen to the previous node delete event. but it doesn't  set itself as a 
> non-leader state. because of the sixth step operation, A still is leader 
> state now.
> 11. now  A client and B client are  the leader at the same time 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to