This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 8358076ed Metaclient - Leader election - Track participants and add 
participant info (#2562)
8358076ed is described below

commit 8358076ede9402ec93f1cdd37578f9e331e8bdfb
Author: xyuanlu <[email protected]>
AuthorDate: Sat Jul 22 21:45:29 2023 -0700

    Metaclient - Leader election - Track participants and add participant info 
(#2562)
    
    
    Co-authored-by: Xiaoyuan Lu <[email protected]>
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |   2 -
 .../leaderelection/LeaderElectionClient.java       | 132 ++++++++++++++++-----
 .../{TestUtil.java => MetaClientTestUtil.java}     |   7 +-
 .../apache/helix/metaclient/impl/zk/TestUtil.java  |  68 +++++++++++
 .../recipes/leaderelection/TestLeaderElection.java | 104 ++++++++++++++--
 5 files changed, 266 insertions(+), 47 deletions(-)

diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 691f31cde..8753747f3 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -57,14 +57,12 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static 
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
 
diff --git 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index d5bdb735a..39233e979 100644
--- 
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++ 
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -21,11 +21,14 @@ package org.apache.helix.metaclient.recipes.leaderelection;
 
 import java.util.Arrays;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
+import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
 import org.apache.helix.metaclient.api.DataChangeListener;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.Op;
@@ -34,6 +37,7 @@ import 
org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
 import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
 import org.slf4j.Logger;
@@ -68,8 +72,13 @@ public class LeaderElectionClient implements AutoCloseable {
   // A list of leader election group that this client joins.
   private Set<String> _leaderGroups = new HashSet<>();
 
+  private Map<String, LeaderInfo> _participantInfos = new HashMap<>();
+
   private final static String LEADER_ENTRY_KEY = "/LEADER";
+  private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS";
+  private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
   ReElectListener _reElectListener = new ReElectListener();
+  ConnectStateListener _connectStateListener = new ConnectStateListener();
 
   /**
    * Construct a LeaderElectionClient using a user passed in 
leaderElectionConfig. It creates a MetaClient
@@ -90,6 +99,7 @@ public class LeaderElectionClient implements AutoCloseable {
           metaClientConfig.getConnectionAddress()).setZkSerializer((new 
LeaderInfoSerializer())).build();
       _metaClient = new 
ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
       _metaClient.connect();
+      _metaClient.subscribeStateChanges(_connectStateListener);
     } else {
       throw new MetaClientException("Unsupported store type: " + 
metaClientConfig.getStoreType());
     }
@@ -121,8 +131,8 @@ public class LeaderElectionClient implements AutoCloseable {
    * @throws RuntimeException if the operation is not succeeded.
    */
   public void joinLeaderElectionParticipantPool(String leaderPath) {
-    // TODO: create participant entry
     subscribeAndTryCreateLeaderEntry(leaderPath);
+    createParticipantInfo(leaderPath, new LeaderInfo(_participant));
   }
 
   /**
@@ -134,8 +144,40 @@ public class LeaderElectionClient implements AutoCloseable 
{
    * @throws RuntimeException if the operation is not succeeded.
    */
   public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo 
userInfo) {
-    // TODO: create participant entry with info
     subscribeAndTryCreateLeaderEntry(leaderPath);
+
+    LeaderInfo participantInfo = new LeaderInfo(userInfo);
+    createParticipantInfo(leaderPath, participantInfo);
+  }
+
+  private void createParticipantInfo(String leaderPath, LeaderInfo 
participantInfo) {
+    _participantInfos.put(leaderPath, participantInfo);
+
+    createPathIfNotExists(leaderPath + PARTICIPANTS_ENTRY_KEY);
+
+    try {
+      // try to create participant info entry, assuming leader election group 
node is already there
+      _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant, participantInfo,
+          MetaClientInterface.EntryMode.EPHEMERAL);
+    } catch (MetaClientNodeExistsException ex) {
+      throw new ConcurrentModificationException("Already joined leader 
election group. ", ex);
+    } catch (MetaClientNoNodeException ex) {
+      // Leader group root entry or participant parent entry is gone after we 
checked or created.
+      // Meaning other client removed the group. Throw 
ConcurrentModificationException.
+      throw new ConcurrentModificationException(
+          "Other client trying to modify the leader election group at the same 
time, please retry.", ex);
+    }
+  }
+
+  private void createPathIfNotExists(String path) {
+    if (_metaClient.exists(path) == null) {
+      LOG.info("{} Creating leader group directory {}.", _participant, path);
+      try {
+        _metaClient.create(path, null);
+      } catch (MetaClientNodeExistsException ignore) {
+        LOG.info("Leader election group root path already created: path {}.", 
path);
+      }
+    }
   }
 
   private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
@@ -144,30 +186,19 @@ public class LeaderElectionClient implements 
AutoCloseable {
     leaderInfo.setLeaderName(_participant);
 
     try {
+      createPathIfNotExists(leaderPath);
+    } catch (MetaClientNoNodeException e) {
+      // Parent entry missed in root path.
+      throw new MetaClientException("Parent entry in leaderGroup path" + 
leaderPath + " does not exist.");
+    }
+
+    // create actual leader node
+    try {
+      LOG.info("{} joining leader group {}.", _participant, leaderPath);
       // try to create leader entry, assuming leader election group node is 
already there
       _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, 
MetaClientInterface.EntryMode.EPHEMERAL);
     } catch (MetaClientNodeExistsException ex) {
-      LOG.info("Already a leader for group {}", leaderPath);
-    } catch (MetaClientNoNodeException ex) {
-      try {
-        // try to create leader path root entry
-        _metaClient.create(leaderPath, null);
-      } catch (MetaClientNodeExistsException ignored) {
-        // root entry created by other client, ignore
-      } catch (MetaClientNoNodeException e) {
-        // Parent entry of user provided leader election group path missing.
-        // (e.g. `/a/b` not created in user specified leader election group 
path /a/b/c/LeaderGroup)
-        throw new MetaClientException("Parent entry in leaderGroup path" + 
leaderPath + " does not exist.");
-      }
-      try {
-        // try to create leader node again.
-        _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, 
MetaClientInterface.EntryMode.EPHEMERAL);
-      } catch (MetaClientNoNodeException e) {
-        // Leader group root entry is gone after we checked at outer catch 
block.
-        // Meaning other client removed the group. Throw 
ConcurrentModificationException.
-        throw new ConcurrentModificationException(
-            "Other client trying to modify the leader election group at the 
same time, please retry.", ex);
-      }
+      LOG.info("Already a leader in leader group {}.", leaderPath);
     }
 
     _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
@@ -222,6 +253,7 @@ public class LeaderElectionClient implements AutoCloseable {
     // deleting ZNode. So that handler in ReElectListener won't recreate the 
leader node.
     if (exitLeaderElectionParticipantPool) {
       _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+      _metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant);
     }
     // check if current participant is the leader
     // read data and stats, check, and multi check + delete
@@ -250,13 +282,23 @@ public class LeaderElectionClient implements 
AutoCloseable {
    * @throws RuntimeException when leader path does not exist. // TODO: define 
exp type
    */
   public String getLeader(String leaderPath) {
+
     LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY);
     return leaderInfo == null ? null : leaderInfo.getLeaderName();
   }
 
-  public LeaderInfo getParticipantInfo(String leaderPath) {
-    // TODO: add getParticipantInfo impl
-    return null;
+  /**
+   * Get current leader.
+   *
+   * @param leaderPath The path for leader election.
+   * @return Returns a LeaderInfo entry. Return null if participant is not in 
the pool.
+   * */
+  public LeaderInfo getParticipantInfo(String leaderPath, String participant) {
+    return _metaClient.get(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
participant);
+  }
+
+  public MetaClientInterface.Stat getLeaderEntryStat(String leaderPath) {
+    return _metaClient.exists(leaderPath + LEADER_ENTRY_KEY);
   }
 
   /**
@@ -271,7 +313,11 @@ public class LeaderElectionClient implements AutoCloseable 
{
    * @throws RuntimeException when leader path does not exist. // TODO: define 
exp type
    */
   public List<String> getParticipants(String leaderPath) {
-    return null;
+    try {
+      return _metaClient.getDirectChildrenKeys(leaderPath + 
PARTICIPANTS_ENTRY_KEY);
+    } catch (MetaClientNoNodeException ex) {
+      throw new MetaClientException("No leader election group create for path 
" + leaderPath, ex);
+    }
   }
 
   /**
@@ -302,6 +348,8 @@ public class LeaderElectionClient implements AutoCloseable {
   @Override
   public void close() throws Exception {
 
+    _metaClient.unsubscribeConnectStateChanges(_connectStateListener);
+
     // exit all previous joined leader election groups
     for (String leaderGroup : _leaderGroups) {
       String leaderGroupPathName =
@@ -318,19 +366,43 @@ public class LeaderElectionClient implements 
AutoCloseable {
     @Override
     public void handleDataChange(String key, Object data, ChangeType 
changeType) throws Exception {
       if (changeType == ChangeType.ENTRY_CREATED) {
-        LOG.info("new leader {} for leader election group {}.", ((LeaderInfo) 
data).getLeaderName(), key);
+        LOG.info("new leader for leader election group {}.", key);
       } else if (changeType == ChangeType.ENTRY_DELETED) {
         if (_leaderGroups.contains(key)) {
           LeaderInfo lf = new LeaderInfo("LEADER");
           lf.setLeaderName(_participant);
           try {
+            LOG.info("Leader gone for group {}, {} try to reelect.", key, 
_participant);
             _metaClient.create(key, lf, 
MetaClientInterface.EntryMode.EPHEMERAL);
           } catch (MetaClientNodeExistsException ex) {
-            LOG.info("Already a leader {} for leader election group {}.", 
((LeaderInfo) data).getLeaderName(), key);
+            LOG.info("Already a leader for leader election group {}.", key);
           }
         }
       }
     }
   }
-}
 
+  class ConnectStateListener implements ConnectStateChangeListener {
+
+    @Override
+    public void handleConnectStateChanged(MetaClientInterface.ConnectState 
prevState,
+        MetaClientInterface.ConnectState currentState) throws Exception {
+      if (prevState == MetaClientInterface.ConnectState.EXPIRED
+          && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+        for (String leaderPath : _participantInfos.keySet()) {
+          _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT + 
_participant, _participantInfos.get(leaderPath),
+              MetaClientInterface.EntryMode.EPHEMERAL);
+        }
+      }
+    }
+
+    @Override
+    public void handleConnectionEstablishmentError(Throwable error) throws 
Exception {
+
+    }
+  }
+
+  public MetaClientInterface getMetaClient() {
+    return _metaClient;
+  }
+}
\ No newline at end of file
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java 
b/meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java
similarity index 75%
rename from meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
rename to 
meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java
index 9b20c8e53..2c7543e31 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java
@@ -1,7 +1,10 @@
 package org.apache.helix.metaclient;
 
-public class TestUtil {
-  public static final long WAIT_DURATION = 6 * 1000L;
+import java.util.concurrent.TimeUnit;
+
+
+public class MetaClientTestUtil {
+  public static final long WAIT_DURATION =  TimeUnit.MINUTES.toMicros(1);
   public interface Verifier {
     boolean verify()
         throws Exception;
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
index 1296a72f3..fbf1ab1f3 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -25,11 +25,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
 
 
 public class TestUtil {
@@ -93,4 +99,66 @@ public class TestUtil {
 
     return lists;
   }
+
+  public static void expireSession(ZkMetaClient client)
+      throws Exception {
+    final CountDownLatch waitNewSession = new CountDownLatch(1);
+    final ZkClient zkClient = client.getZkClient();
+
+    IZkStateListener listener = new IZkStateListener() {
+      @Override
+      public void handleStateChanged(Watcher.Event.KeeperState state)
+          throws Exception {
+      }
+
+      @Override
+      public void handleNewSession(final String sessionId)
+          throws Exception {
+        // make sure zkclient is connected again
+        zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, 
TimeUnit.SECONDS);
+
+        waitNewSession.countDown();
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1)
+          throws Exception {
+      }
+    };
+
+    zkClient.subscribeStateChanges(listener);
+
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+    ZooKeeper curZookeeper = connection.getZookeeper();
+    String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+      }
+    };
+
+    final ZooKeeper dupZookeeper =
+        new ZooKeeper(connection.getServers(), 
curZookeeper.getSessionTimeout(), watcher,
+            curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
+    // wait until connected, then close
+    while (dupZookeeper.getState() != ZooKeeper.States.CONNECTED) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(dupZookeeper.getState(), ZooKeeper.States.CONNECTED,
+        "Fail to connect to zk using current session info");
+    dupZookeeper.close();
+
+    // make sure session expiry really happens
+    waitNewSession.await();
+    zkClient.unsubscribeStateChanges(listener);
+
+    connection = (ZkConnection) zkClient.getConnection();
+    curZookeeper = connection.getZookeeper();
+
+    String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+    Assert.assertFalse(newSessionId.equals(oldSessionId),
+        "Fail to expire current session, zk: " + curZookeeper);
+  }
+
 }
\ No newline at end of file
diff --git 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 3a45d97ce..412ecaf8b 100644
--- 
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++ 
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -1,11 +1,15 @@
 package org.apache.helix.metaclient.recipes.leaderelection;
 
-import org.apache.helix.metaclient.TestUtil;
+import java.util.ConcurrentModificationException;
+import org.apache.helix.metaclient.MetaClientTestUtil;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
 import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.apache.helix.metaclient.impl.zk.TestUtil.*;
+
 
 public class TestLeaderElection extends ZkMetaClientTestBase {
 
@@ -15,8 +19,8 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
 
   public LeaderElectionClient createLeaderElectionClient(String 
participantName) {
     MetaClientConfig.StoreType storeType = 
MetaClientConfig.StoreType.ZOOKEEPER;
-    MetaClientConfig config = new 
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
-        .setStoreType(storeType).build();
+    MetaClientConfig config =
+        new 
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build();
     return new LeaderElectionClient(config, participantName);
   }
 
@@ -29,31 +33,31 @@ public class TestLeaderElection extends 
ZkMetaClientTestBase {
     clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
     clt2.joinLeaderElectionParticipantPool(LEADER_PATH);
     // First client joining the leader election group should be current leader
-    Assert.assertTrue(TestUtil.verify(() -> {
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(LEADER_PATH) != null);
-    }, TestUtil.WAIT_DURATION));
+    }, MetaClientTestUtil.WAIT_DURATION));
     Assert.assertNotNull(clt1.getLeader(LEADER_PATH));
     Assert.assertEquals(clt1.getLeader(LEADER_PATH), 
clt2.getLeader(LEADER_PATH));
     Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1);
 
     // client 1 exit leader election group, and client 2 should be current 
leader.
     clt1.exitLeaderElectionParticipantPool(LEADER_PATH);
-    Assert.assertTrue(TestUtil.verify(() -> {
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(LEADER_PATH) != null);
-    }, TestUtil.WAIT_DURATION));
-    Assert.assertTrue(TestUtil.verify(() -> {
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2));
-    }, TestUtil.WAIT_DURATION));
+    }, MetaClientTestUtil.WAIT_DURATION));
 
     // client1 join and client2 leave. client 1 should be leader.
     clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
     clt2.exitLeaderElectionParticipantPool(LEADER_PATH);
-    Assert.assertTrue(TestUtil.verify(() -> {
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(LEADER_PATH) != null);
-    }, TestUtil.WAIT_DURATION));
-    Assert.assertTrue(TestUtil.verify(() -> {
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
       return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1));
-    }, TestUtil.WAIT_DURATION));
+    }, MetaClientTestUtil.WAIT_DURATION));
     Assert.assertTrue(clt1.isLeader(LEADER_PATH));
     Assert.assertFalse(clt2.isLeader(LEADER_PATH));
 
@@ -61,4 +65,78 @@ public class TestLeaderElection extends ZkMetaClientTestBase 
{
     clt2.close();
   }
 
+  @Test
+  public void testElectionPoolMembership() throws Exception {
+    String leaderPath = LEADER_PATH + "/testElectionPoolMembership";
+    LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+    participantInfo.setSimpleField("Key1", "value1");
+    LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+    participantInfo2.setSimpleField("Key2", "value2");
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+    try {
+      clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // 
no op
+    } catch (ConcurrentModificationException ex) {
+      // expected
+      Assert.assertEquals(ex.getClass().getName(), 
"java.util.ConcurrentModificationException");
+    }
+    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
+    Assert.assertNotNull(clt1.getLeader(leaderPath));
+    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+
+    // clt1 gone
+    clt1.relinquishLeader(leaderPath);
+    clt1.exitLeaderElectionParticipantPool(leaderPath);
+    clt2.exitLeaderElectionParticipantPool(leaderPath);
+
+    Assert.assertNull(clt2.getParticipantInfo(LEADER_PATH, PARTICIPANT_NAME2));
+  }
+
+  @Test
+  public void testSessionExpire() throws Exception {
+    String leaderPath = LEADER_PATH + "/testSessionExpire";
+    LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+    participantInfo.setSimpleField("Key1", "value1");
+    LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+    participantInfo2.setSimpleField("Key2", "value2");
+    LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+    LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+    clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+    try {
+      clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); // 
no op
+    } catch (ConcurrentModificationException ex) {
+      // expected
+      Assert.assertEquals(ex.getClass().getName(), 
"java.util.ConcurrentModificationException");
+    }
+    clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+    // a leader should be up
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+
+    // session expire and reconnect
+    expireSession((ZkMetaClient) clt1.getMetaClient());
+
+    Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+      return (clt1.getLeader(leaderPath) != null);
+    }, MetaClientTestUtil.WAIT_DURATION));
+    Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
+    Assert.assertNotNull(clt1.getLeader(leaderPath));
+    // when session recreated, participant info node should maintain
+    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+    Assert.assertEquals(clt1.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+    Assert.assertEquals(clt2.getParticipantInfo(leaderPath, 
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+  }
 }

Reply via email to