This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 8358076ed Metaclient - Leader election - Track participants and add
participant info (#2562)
8358076ed is described below
commit 8358076ede9402ec93f1cdd37578f9e331e8bdfb
Author: xyuanlu <[email protected]>
AuthorDate: Sat Jul 22 21:45:29 2023 -0700
Metaclient - Leader election - Track participants and add participant info
(#2562)
Co-authored-by: Xiaoyuan Lu <[email protected]>
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 2 -
.../leaderelection/LeaderElectionClient.java | 132 ++++++++++++++++-----
.../{TestUtil.java => MetaClientTestUtil.java} | 7 +-
.../apache/helix/metaclient/impl/zk/TestUtil.java | 68 +++++++++++
.../recipes/leaderelection/TestLeaderElection.java | 104 ++++++++++++++--
5 files changed, 266 insertions(+), 47 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 691f31cde..8753747f3 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -57,14 +57,12 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
import static
org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index d5bdb735a..39233e979 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -21,11 +21,14 @@ package org.apache.helix.metaclient.recipes.leaderelection;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
@@ -34,6 +37,7 @@ import
org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
import org.slf4j.Logger;
@@ -68,8 +72,13 @@ public class LeaderElectionClient implements AutoCloseable {
// A list of leader election group that this client joins.
private Set<String> _leaderGroups = new HashSet<>();
+ private Map<String, LeaderInfo> _participantInfos = new HashMap<>();
+
private final static String LEADER_ENTRY_KEY = "/LEADER";
+ private final static String PARTICIPANTS_ENTRY_KEY = "/PARTICIPANTS";
+ private final static String PARTICIPANTS_ENTRY_PARENT = "/PARTICIPANTS/";
ReElectListener _reElectListener = new ReElectListener();
+ ConnectStateListener _connectStateListener = new ConnectStateListener();
/**
* Construct a LeaderElectionClient using a user passed in
leaderElectionConfig. It creates a MetaClient
@@ -90,6 +99,7 @@ public class LeaderElectionClient implements AutoCloseable {
metaClientConfig.getConnectionAddress()).setZkSerializer((new
LeaderInfoSerializer())).build();
_metaClient = new
ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
_metaClient.connect();
+ _metaClient.subscribeStateChanges(_connectStateListener);
} else {
throw new MetaClientException("Unsupported store type: " +
metaClientConfig.getStoreType());
}
@@ -121,8 +131,8 @@ public class LeaderElectionClient implements AutoCloseable {
* @throws RuntimeException if the operation is not succeeded.
*/
public void joinLeaderElectionParticipantPool(String leaderPath) {
- // TODO: create participant entry
subscribeAndTryCreateLeaderEntry(leaderPath);
+ createParticipantInfo(leaderPath, new LeaderInfo(_participant));
}
/**
@@ -134,8 +144,40 @@ public class LeaderElectionClient implements AutoCloseable
{
* @throws RuntimeException if the operation is not succeeded.
*/
public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo
userInfo) {
- // TODO: create participant entry with info
subscribeAndTryCreateLeaderEntry(leaderPath);
+
+ LeaderInfo participantInfo = new LeaderInfo(userInfo);
+ createParticipantInfo(leaderPath, participantInfo);
+ }
+
+ private void createParticipantInfo(String leaderPath, LeaderInfo
participantInfo) {
+ _participantInfos.put(leaderPath, participantInfo);
+
+ createPathIfNotExists(leaderPath + PARTICIPANTS_ENTRY_KEY);
+
+ try {
+ // try to create participant info entry, assuming leader election group
node is already there
+ _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant, participantInfo,
+ MetaClientInterface.EntryMode.EPHEMERAL);
+ } catch (MetaClientNodeExistsException ex) {
+ throw new ConcurrentModificationException("Already joined leader
election group. ", ex);
+ } catch (MetaClientNoNodeException ex) {
+ // Leader group root entry or participant parent entry is gone after we
checked or created.
+ // Meaning other client removed the group. Throw
ConcurrentModificationException.
+ throw new ConcurrentModificationException(
+ "Other client trying to modify the leader election group at the same
time, please retry.", ex);
+ }
+ }
+
+ private void createPathIfNotExists(String path) {
+ if (_metaClient.exists(path) == null) {
+ LOG.info("{} Creating leader group directory {}.", _participant, path);
+ try {
+ _metaClient.create(path, null);
+ } catch (MetaClientNodeExistsException ignore) {
+ LOG.info("Leader election group root path already created: path {}.",
path);
+ }
+ }
}
private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
@@ -144,30 +186,19 @@ public class LeaderElectionClient implements
AutoCloseable {
leaderInfo.setLeaderName(_participant);
try {
+ createPathIfNotExists(leaderPath);
+ } catch (MetaClientNoNodeException e) {
+ // Parent entry missed in root path.
+ throw new MetaClientException("Parent entry in leaderGroup path" +
leaderPath + " does not exist.");
+ }
+
+ // create actual leader node
+ try {
+ LOG.info("{} joining leader group {}.", _participant, leaderPath);
// try to create leader entry, assuming leader election group node is
already there
_metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo,
MetaClientInterface.EntryMode.EPHEMERAL);
} catch (MetaClientNodeExistsException ex) {
- LOG.info("Already a leader for group {}", leaderPath);
- } catch (MetaClientNoNodeException ex) {
- try {
- // try to create leader path root entry
- _metaClient.create(leaderPath, null);
- } catch (MetaClientNodeExistsException ignored) {
- // root entry created by other client, ignore
- } catch (MetaClientNoNodeException e) {
- // Parent entry of user provided leader election group path missing.
- // (e.g. `/a/b` not created in user specified leader election group
path /a/b/c/LeaderGroup)
- throw new MetaClientException("Parent entry in leaderGroup path" +
leaderPath + " does not exist.");
- }
- try {
- // try to create leader node again.
- _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo,
MetaClientInterface.EntryMode.EPHEMERAL);
- } catch (MetaClientNoNodeException e) {
- // Leader group root entry is gone after we checked at outer catch
block.
- // Meaning other client removed the group. Throw
ConcurrentModificationException.
- throw new ConcurrentModificationException(
- "Other client trying to modify the leader election group at the
same time, please retry.", ex);
- }
+ LOG.info("Already a leader in leader group {}.", leaderPath);
}
_leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
@@ -222,6 +253,7 @@ public class LeaderElectionClient implements AutoCloseable {
// deleting ZNode. So that handler in ReElectListener won't recreate the
leader node.
if (exitLeaderElectionParticipantPool) {
_leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+ _metaClient.delete(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant);
}
// check if current participant is the leader
// read data and stats, check, and multi check + delete
@@ -250,13 +282,23 @@ public class LeaderElectionClient implements
AutoCloseable {
* @throws RuntimeException when leader path does not exist. // TODO: define
exp type
*/
public String getLeader(String leaderPath) {
+
LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY);
return leaderInfo == null ? null : leaderInfo.getLeaderName();
}
- public LeaderInfo getParticipantInfo(String leaderPath) {
- // TODO: add getParticipantInfo impl
- return null;
+ /**
+ * Get current leader.
+ *
+ * @param leaderPath The path for leader election.
+ * @return Returns a LeaderInfo entry. Return null if participant is not in
the pool.
+ * */
+ public LeaderInfo getParticipantInfo(String leaderPath, String participant) {
+ return _metaClient.get(leaderPath + PARTICIPANTS_ENTRY_PARENT +
participant);
+ }
+
+ public MetaClientInterface.Stat getLeaderEntryStat(String leaderPath) {
+ return _metaClient.exists(leaderPath + LEADER_ENTRY_KEY);
}
/**
@@ -271,7 +313,11 @@ public class LeaderElectionClient implements AutoCloseable
{
* @throws RuntimeException when leader path does not exist. // TODO: define
exp type
*/
public List<String> getParticipants(String leaderPath) {
- return null;
+ try {
+ return _metaClient.getDirectChildrenKeys(leaderPath +
PARTICIPANTS_ENTRY_KEY);
+ } catch (MetaClientNoNodeException ex) {
+ throw new MetaClientException("No leader election group create for path
" + leaderPath, ex);
+ }
}
/**
@@ -302,6 +348,8 @@ public class LeaderElectionClient implements AutoCloseable {
@Override
public void close() throws Exception {
+ _metaClient.unsubscribeConnectStateChanges(_connectStateListener);
+
// exit all previous joined leader election groups
for (String leaderGroup : _leaderGroups) {
String leaderGroupPathName =
@@ -318,19 +366,43 @@ public class LeaderElectionClient implements
AutoCloseable {
@Override
public void handleDataChange(String key, Object data, ChangeType
changeType) throws Exception {
if (changeType == ChangeType.ENTRY_CREATED) {
- LOG.info("new leader {} for leader election group {}.", ((LeaderInfo)
data).getLeaderName(), key);
+ LOG.info("new leader for leader election group {}.", key);
} else if (changeType == ChangeType.ENTRY_DELETED) {
if (_leaderGroups.contains(key)) {
LeaderInfo lf = new LeaderInfo("LEADER");
lf.setLeaderName(_participant);
try {
+ LOG.info("Leader gone for group {}, {} try to reelect.", key,
_participant);
_metaClient.create(key, lf,
MetaClientInterface.EntryMode.EPHEMERAL);
} catch (MetaClientNodeExistsException ex) {
- LOG.info("Already a leader {} for leader election group {}.",
((LeaderInfo) data).getLeaderName(), key);
+ LOG.info("Already a leader for leader election group {}.", key);
}
}
}
}
}
-}
+ class ConnectStateListener implements ConnectStateChangeListener {
+
+ @Override
+ public void handleConnectStateChanged(MetaClientInterface.ConnectState
prevState,
+ MetaClientInterface.ConnectState currentState) throws Exception {
+ if (prevState == MetaClientInterface.ConnectState.EXPIRED
+ && currentState == MetaClientInterface.ConnectState.CONNECTED) {
+ for (String leaderPath : _participantInfos.keySet()) {
+ _metaClient.create(leaderPath + PARTICIPANTS_ENTRY_PARENT +
_participant, _participantInfos.get(leaderPath),
+ MetaClientInterface.EntryMode.EPHEMERAL);
+ }
+ }
+ }
+
+ @Override
+ public void handleConnectionEstablishmentError(Throwable error) throws
Exception {
+
+ }
+ }
+
+ public MetaClientInterface getMetaClient() {
+ return _metaClient;
+ }
+}
\ No newline at end of file
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
b/meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java
similarity index 75%
rename from meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
rename to
meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java
index 9b20c8e53..2c7543e31 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/MetaClientTestUtil.java
@@ -1,7 +1,10 @@
package org.apache.helix.metaclient;
-public class TestUtil {
- public static final long WAIT_DURATION = 6 * 1000L;
+import java.util.concurrent.TimeUnit;
+
+
+public class MetaClientTestUtil {
+ public static final long WAIT_DURATION = TimeUnit.MINUTES.toMicros(1);
public interface Verifier {
boolean verify()
throws Exception;
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
index 1296a72f3..fbf1ab1f3 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestUtil.java
@@ -25,11 +25,17 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
public class TestUtil {
@@ -93,4 +99,66 @@ public class TestUtil {
return lists;
}
+
+ public static void expireSession(ZkMetaClient client)
+ throws Exception {
+ final CountDownLatch waitNewSession = new CountDownLatch(1);
+ final ZkClient zkClient = client.getZkClient();
+
+ IZkStateListener listener = new IZkStateListener() {
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state)
+ throws Exception {
+ }
+
+ @Override
+ public void handleNewSession(final String sessionId)
+ throws Exception {
+ // make sure zkclient is connected again
+ zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
TimeUnit.SECONDS);
+
+ waitNewSession.countDown();
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable var1)
+ throws Exception {
+ }
+ };
+
+ zkClient.subscribeStateChanges(listener);
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
+ ZooKeeper curZookeeper = connection.getZookeeper();
+ String oldSessionId = Long.toHexString(curZookeeper.getSessionId());
+
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ }
+ };
+
+ final ZooKeeper dupZookeeper =
+ new ZooKeeper(connection.getServers(),
curZookeeper.getSessionTimeout(), watcher,
+ curZookeeper.getSessionId(), curZookeeper.getSessionPasswd());
+ // wait until connected, then close
+ while (dupZookeeper.getState() != ZooKeeper.States.CONNECTED) {
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(dupZookeeper.getState(), ZooKeeper.States.CONNECTED,
+ "Fail to connect to zk using current session info");
+ dupZookeeper.close();
+
+ // make sure session expiry really happens
+ waitNewSession.await();
+ zkClient.unsubscribeStateChanges(listener);
+
+ connection = (ZkConnection) zkClient.getConnection();
+ curZookeeper = connection.getZookeeper();
+
+ String newSessionId = Long.toHexString(curZookeeper.getSessionId());
+ Assert.assertFalse(newSessionId.equals(oldSessionId),
+ "Fail to expire current session, zk: " + curZookeeper);
+ }
+
}
\ No newline at end of file
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 3a45d97ce..412ecaf8b 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -1,11 +1,15 @@
package org.apache.helix.metaclient.recipes.leaderelection;
-import org.apache.helix.metaclient.TestUtil;
+import java.util.ConcurrentModificationException;
+import org.apache.helix.metaclient.MetaClientTestUtil;
import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.apache.helix.metaclient.impl.zk.TestUtil.*;
+
public class TestLeaderElection extends ZkMetaClientTestBase {
@@ -15,8 +19,8 @@ public class TestLeaderElection extends ZkMetaClientTestBase {
public LeaderElectionClient createLeaderElectionClient(String
participantName) {
MetaClientConfig.StoreType storeType =
MetaClientConfig.StoreType.ZOOKEEPER;
- MetaClientConfig config = new
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
- .setStoreType(storeType).build();
+ MetaClientConfig config =
+ new
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR).setStoreType(storeType).build();
return new LeaderElectionClient(config, participantName);
}
@@ -29,31 +33,31 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
clt2.joinLeaderElectionParticipantPool(LEADER_PATH);
// First client joining the leader election group should be current leader
- Assert.assertTrue(TestUtil.verify(() -> {
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
- }, TestUtil.WAIT_DURATION));
+ }, MetaClientTestUtil.WAIT_DURATION));
Assert.assertNotNull(clt1.getLeader(LEADER_PATH));
Assert.assertEquals(clt1.getLeader(LEADER_PATH),
clt2.getLeader(LEADER_PATH));
Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1);
// client 1 exit leader election group, and client 2 should be current
leader.
clt1.exitLeaderElectionParticipantPool(LEADER_PATH);
- Assert.assertTrue(TestUtil.verify(() -> {
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
- }, TestUtil.WAIT_DURATION));
- Assert.assertTrue(TestUtil.verify(() -> {
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2));
- }, TestUtil.WAIT_DURATION));
+ }, MetaClientTestUtil.WAIT_DURATION));
// client1 join and client2 leave. client 1 should be leader.
clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
clt2.exitLeaderElectionParticipantPool(LEADER_PATH);
- Assert.assertTrue(TestUtil.verify(() -> {
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
- }, TestUtil.WAIT_DURATION));
- Assert.assertTrue(TestUtil.verify(() -> {
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1));
- }, TestUtil.WAIT_DURATION));
+ }, MetaClientTestUtil.WAIT_DURATION));
Assert.assertTrue(clt1.isLeader(LEADER_PATH));
Assert.assertFalse(clt2.isLeader(LEADER_PATH));
@@ -61,4 +65,78 @@ public class TestLeaderElection extends ZkMetaClientTestBase
{
clt2.close();
}
+ @Test
+ public void testElectionPoolMembership() throws Exception {
+ String leaderPath = LEADER_PATH + "/testElectionPoolMembership";
+ LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+ participantInfo.setSimpleField("Key1", "value1");
+ LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+ participantInfo2.setSimpleField("Key2", "value2");
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+ try {
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); //
no op
+ } catch (ConcurrentModificationException ex) {
+ // expected
+ Assert.assertEquals(ex.getClass().getName(),
"java.util.ConcurrentModificationException");
+ }
+ clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
+ Assert.assertNotNull(clt1.getLeader(leaderPath));
+ Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+
+ // clt1 gone
+ clt1.relinquishLeader(leaderPath);
+ clt1.exitLeaderElectionParticipantPool(leaderPath);
+ clt2.exitLeaderElectionParticipantPool(leaderPath);
+
+ Assert.assertNull(clt2.getParticipantInfo(LEADER_PATH, PARTICIPANT_NAME2));
+ }
+
+ @Test
+ public void testSessionExpire() throws Exception {
+ String leaderPath = LEADER_PATH + "/testSessionExpire";
+ LeaderInfo participantInfo = new LeaderInfo(PARTICIPANT_NAME1);
+ participantInfo.setSimpleField("Key1", "value1");
+ LeaderInfo participantInfo2 = new LeaderInfo(PARTICIPANT_NAME2);
+ participantInfo2.setSimpleField("Key2", "value2");
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo);
+ try {
+ clt1.joinLeaderElectionParticipantPool(leaderPath, participantInfo); //
no op
+ } catch (ConcurrentModificationException ex) {
+ // expected
+ Assert.assertEquals(ex.getClass().getName(),
"java.util.ConcurrentModificationException");
+ }
+ clt2.joinLeaderElectionParticipantPool(leaderPath, participantInfo2);
+ // a leader should be up
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+ // session expire and reconnect
+ expireSession((ZkMetaClient) clt1.getMetaClient());
+
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertNotNull(clt1.getLeaderEntryStat(leaderPath));
+ Assert.assertNotNull(clt1.getLeader(leaderPath));
+ // when session recreated, participant info node should maintain
+ Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME1).getSimpleField("Key1"), "value1");
+ Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+ Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
+ }
}