This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new c6890faa1 Metaclient - Leader election - leader change event
notification (#2560)
c6890faa1 is described below
commit c6890faa13acee0ce9ec51a1fde70eebb3ee48e5
Author: xyuanlu <[email protected]>
AuthorDate: Sat Jul 22 22:39:37 2023 -0700
Metaclient - Leader election - leader change event notification (#2560)
Co-authored-by: Xiaoyuan Lu <[email protected]>
---
.../impl/zk/adapter/DataListenerAdapter.java | 1 +
.../leaderelection/LeaderElectionClient.java | 4 +-
.../LeaderElectionListenerInterface.java | 15 ++-
.../LeaderElectionListenerInterfaceAdapter.java | 43 +++++++
.../zk/TestConnectStateChangeListenerAndRetry.java | 3 +-
.../recipes/leaderelection/TestLeaderElection.java | 135 ++++++++++++++++++++-
.../helix/zookeeper/zkclient/ZkConnection.java | 1 -
7 files changed, 193 insertions(+), 9 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
index 94ae198ce..748b6ed3f 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/DataListenerAdapter.java
@@ -21,6 +21,7 @@ package org.apache.helix.metaclient.impl.zk.adapter;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.annotation.PreFetchChangedData;
import org.apache.zookeeper.Watcher;
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index 39233e979..7b13778c0 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -334,7 +334,8 @@ public class LeaderElectionClient implements AutoCloseable {
* @return A boolean value indicating if registration is success.
*/
public boolean subscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
- //TODO: add converter class for LeaderElectionListenerInterface
+ _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new
LeaderElectionListenerInterfaceAdapter(listener),
+ false);
return false;
}
@@ -343,6 +344,7 @@ public class LeaderElectionClient implements AutoCloseable {
* @param listener An implementation of LeaderElectionListenerInterface
*/
public void unsubscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
+ _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, new
LeaderElectionListenerInterfaceAdapter(listener));
}
@Override
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
index 0436e1eb0..230fc2af1 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterface.java
@@ -24,9 +24,16 @@ package org.apache.helix.metaclient.recipes.leaderelection;
* leader node is deleted.
*/
public interface LeaderElectionListenerInterface {
+ enum ChangeType {
+ LEADER_ACQUIRED,
+ LEADER_LOST
+ }
+
// When new leader is elected:
- // noLeader (null) -> has
leader (new leader name)
- // When existing leader not leader anymore:
- // has Leader (prevleader name) -> no
leader (null)
- public void onLeadershipChange(String leaderPath, String prevLeader, String
curLeader);
+ // ChangeType == NEW_LEADER_ELECTED, curLeader is
the new leader name
+ // When no leader anymore:
+ // ChangeType == LEADER_GONE, curLeader is an empty
string
+ // In ZK implementation, since notification does not include changed data
and metaclient fetches
+ // the entry when event comes, it is possible that
+ public void onLeadershipChange(String leaderPath, ChangeType type, String
curLeader);
}
\ No newline at end of file
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
new file mode 100644
index 000000000..5c64d6790
--- /dev/null
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionListenerInterfaceAdapter.java
@@ -0,0 +1,43 @@
+package org.apache.helix.metaclient.recipes.leaderelection;
+
+import org.apache.helix.metaclient.api.DataChangeListener;
+
+import static
org.apache.helix.metaclient.recipes.leaderelection.LeaderElectionListenerInterface.ChangeType.*;
+
+
+public class LeaderElectionListenerInterfaceAdapter implements
DataChangeListener {
+ private final LeaderElectionListenerInterface _leaderElectionListener;
+
+ public
LeaderElectionListenerInterfaceAdapter(LeaderElectionListenerInterface
leaderElectionListener) {
+ _leaderElectionListener = leaderElectionListener;
+ }
+
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType changeType)
throws Exception {
+ switch (changeType) {
+ case ENTRY_CREATED:
+ String newLeader = ((LeaderInfo) data).getLeaderName();
+ _leaderElectionListener.onLeadershipChange(key, LEADER_ACQUIRED,
newLeader);
+ break;
+ case ENTRY_DELETED:
+ _leaderElectionListener.onLeadershipChange(key, LEADER_LOST, "");
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LeaderElectionListenerInterfaceAdapter that =
(LeaderElectionListenerInterfaceAdapter) o;
+ return _leaderElectionListener.equals(that._leaderElectionListener);
+ }
+
+ @Override
+ public int hashCode() {
+ return _leaderElectionListener.hashCode();
+ }
+}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
index 36b9b2131..c74b7d7ef 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
@@ -162,7 +162,8 @@ public class TestConnectStateChangeListenerAndRetry {
zkMetaClient.create("/key", "value");
Assert.fail("Create call after close should throw
IllegalStateException");
} catch (Exception ex) {
- Assert.assertTrue(ex.getCause() instanceof IllegalStateException);
+ System.out.println("ex " + ex);
+ Assert.assertTrue(ex instanceof IllegalStateException);
}
}
System.out.println("END
TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " +
new Date(System.currentTimeMillis()));
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
index 412ecaf8b..b0b396c1a 100644
---
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -2,6 +2,8 @@ package org.apache.helix.metaclient.recipes.leaderelection;
import java.util.ConcurrentModificationException;
import org.apache.helix.metaclient.MetaClientTestUtil;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
@@ -26,12 +28,14 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
@Test
public void testAcquireLeadership() throws Exception {
+ String leaderPath = LEADER_PATH + "testAcquireLeadership";
+
// create 2 clients representing 2 participants
LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
- clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
- clt2.joinLeaderElectionParticipantPool(LEADER_PATH);
+ clt1.joinLeaderElectionParticipantPool(leaderPath);
+ clt2.joinLeaderElectionParticipantPool(leaderPath);
// First client joining the leader election group should be current leader
Assert.assertTrue(MetaClientTestUtil.verify(() -> {
return (clt1.getLeader(LEADER_PATH) != null);
@@ -139,4 +143,131 @@ public class TestLeaderElection extends
ZkMetaClientTestBase {
Assert.assertEquals(clt1.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
Assert.assertEquals(clt2.getParticipantInfo(leaderPath,
PARTICIPANT_NAME2).getSimpleField("Key2"), "value2");
}
+ @Test (dependsOnMethods = "testAcquireLeadership")
+ public void testLeadershipListener() throws Exception {
+ String leaderPath = LEADER_PATH + "testLeadershipListener";
+ // create 2 clients representing 2 participants
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+ LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ final int count = 10;
+ final int[] numNewLeaderEvent = {0};
+ final int[] numLeaderGoneEvent = {0};
+ CountDownLatch countDownLatchNewLeader = new CountDownLatch(count*2);
+ CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count*2);
+
+ LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
+
+ @Override
+ public void onLeadershipChange(String leaderPath, ChangeType type,
String curLeader) {
+ if (type == ChangeType.LEADER_LOST) {
+ countDownLatchLeaderGone.countDown();
+ Assert.assertEquals(curLeader.length(), 0);
+ numLeaderGoneEvent[0]++;
+ } else if (type == ChangeType.LEADER_ACQUIRED) {
+ countDownLatchNewLeader.countDown();
+ numNewLeaderEvent[0]++;
+ Assert.assertTrue(curLeader.length()!=0);
+ } else {
+ Assert.fail();
+ }
+ }
+ };
+
+ clt3.subscribeLeadershipChanges(leaderPath, listener);
+
+ // each iteration will be participant_1 is new leader, leader gone,
participant_2 is new leader, leader gone
+ for (int i=0; i<count; ++i) {
+ joinPoolTestHelper(leaderPath, clt1, clt2);
+ Thread.sleep(1000);
+ }
+
+
Assert.assertTrue(countDownLatchNewLeader.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
+
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
+ Assert.assertEquals(numLeaderGoneEvent[0], count*2);
+ Assert.assertEquals(numNewLeaderEvent[0], count*2);
+
+ clt3.unsubscribeLeadershipChanges(leaderPath, listener);
+
+ // listener shouldn't receive any event after unsubscribe
+ joinPoolTestHelper(leaderPath, clt1, clt2);
+ Assert.assertEquals(numLeaderGoneEvent[0], count*2);
+ Assert.assertEquals(numNewLeaderEvent[0], count*2);
+
+ clt1.close();
+ clt2.close();
+ clt3.close();
+ }
+
+ private void joinPoolTestHelper(String leaderPath, LeaderElectionClient
clt1, LeaderElectionClient clt2) throws Exception {
+ clt1.joinLeaderElectionParticipantPool(leaderPath);
+ clt2.joinLeaderElectionParticipantPool(leaderPath);
+
+ Thread.sleep(2000);
+
+ // clt1 gone
+ clt1.exitLeaderElectionParticipantPool(leaderPath);
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+ clt2.exitLeaderElectionParticipantPool(leaderPath);
+
+ }
+
+ @Test (dependsOnMethods = "testLeadershipListener")
+ public void testRelinquishLeadership() throws Exception {
+ String leaderPath = LEADER_PATH + "testRelinquishLeadership";
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+ LeaderElectionClient clt3 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+
+ final int count = 1;
+ CountDownLatch countDownLatchNewLeader = new CountDownLatch(count);
+ CountDownLatch countDownLatchLeaderGone = new CountDownLatch(count);
+
+ LeaderElectionListenerInterface listener = new
LeaderElectionListenerInterface() {
+
+ @Override
+ public void onLeadershipChange(String leaderPath, ChangeType type,
String curLeader) {
+ if (type == ChangeType.LEADER_LOST) {
+ countDownLatchLeaderGone.countDown();
+ Assert.assertEquals(curLeader.length(), 0);
+ } else if (type == ChangeType.LEADER_ACQUIRED) {
+ countDownLatchNewLeader.countDown();
+ Assert.assertTrue(curLeader.length()!=0);
+ } else {
+ Assert.fail();
+ }
+ }
+ };
+
+ clt1.joinLeaderElectionParticipantPool(leaderPath);
+ clt2.joinLeaderElectionParticipantPool(leaderPath);
+ clt3.subscribeLeadershipChanges(leaderPath, listener);
+ // clt1 gone
+ clt1.relinquishLeader(leaderPath);
+
+ // participant 1 should have gone, and a leader gone event is sent
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+
Assert.assertTrue(countDownLatchLeaderGone.await(MetaClientTestUtil.WAIT_DURATION,
TimeUnit.MILLISECONDS));
+
+ clt1.exitLeaderElectionParticipantPool(leaderPath);
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath) != null);
+ }, MetaClientTestUtil.WAIT_DURATION));
+ Assert.assertTrue(MetaClientTestUtil.verify(() -> {
+ return (clt1.getLeader(leaderPath).equals(PARTICIPANT_NAME2));
+ }, MetaClientTestUtil.WAIT_DURATION));
+
+ clt2.exitLeaderElectionParticipantPool(leaderPath);
+ }
+
}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 589425462..376409231 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -260,7 +260,6 @@ public class ZkConnection implements IZkConnection {
private void lookupGetChildrenMethod() {
_getChildrenMethod = doLookUpGetChildrenMethod();
- System.out.println("
ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED " +
GETCHILDREN_PAGINATION_DISABLED);
LOG.info("Pagination config {}={}, method to be invoked: {}",
ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED,
GETCHILDREN_PAGINATION_DISABLED,
_getChildrenMethod.getName());