Repository: nifi Updated Branches: refs/heads/master 262bf011e -> 54eb6bc23
NIFI-5096: Periodically poll ZooKeeper to determine the leader for each registered role in Leader Election. This avoids a condition whereby a node may occasionally fail to receive notification that it is no longer the elected leader. NIFI-5096: More proactively setting leadership to false when ZooKeeper/Curator ConnectionState changes This closes #2646 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/54eb6bc2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/54eb6bc2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/54eb6bc2 Branch: refs/heads/master Commit: 54eb6bc23211ad2b499f42e14759f3646f806d2f Parents: 262bf01 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Apr 19 09:05:32 2018 -0400 Committer: Matt Gilman <matt.c.gil...@gmail.com> Committed: Fri Apr 20 16:51:02 2018 -0400 ---------------------------------------------------------------------- .../election/CuratorLeaderElectionManager.java | 67 ++++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/54eb6bc2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index 54ca257..d51c79f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.controller.leader.election; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -36,6 +34,9 @@ import org.apache.zookeeper.common.PathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + public class CuratorLeaderElectionManager implements LeaderElectionManager { private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class); @@ -112,7 +113,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { final boolean isParticipant = participantId != null && !participantId.trim().isEmpty(); if (!isStopped()) { - final ElectionListener electionListener = new ElectionListener(roleName, listener); + final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId); final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener); if (isParticipant) { leaderSelector.autoRequeue(); @@ -358,12 +359,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { private final String roleName; private final LeaderElectionStateChangeListener listener; + private final String participantId; private volatile boolean leader; - public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) { + public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { this.roleName = roleName; this.listener = listener; + this.participantId = participantId; } public boolean isLeader() { @@ -373,9 +376,37 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { @Override public void stateChanged(final CuratorFramework client, final ConnectionState newState) { logger.info("{} Connection State changed to {}", this, newState.name()); + + if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) { + if (leader == true) { + logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName); + } + + leader = false; + } + super.stateChanged(client, newState); } + /** + * Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose + * its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc. + * + * @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise + */ + private boolean verifyLeader() { + final String leader = getLeader(roleName); + if (leader == null) { + logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName); + return false; + } + + final boolean match = leader.equals(participantId); + logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}", + roleName, leader, participantId, match); + return match; + } + @Override public void takeLeadership(final CuratorFramework client) throws Exception { leader = true; @@ -396,7 +427,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { // Curator API states that we lose the leadership election when we return from this method, // so we will block as long as we are not interrupted or closed. Then, we will set leader to false. try { - while (!isStopped()) { + int failureCount = 0; + int loopCount = 0; + while (!isStopped() && leader) { try { Thread.sleep(100L); } catch (final InterruptedException ie) { @@ -404,6 +437,30 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { Thread.currentThread().interrupt(); return; } + + if (leader && ++loopCount % 50 == 0) { + // While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally + // seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper + // periodically to determine whether or not this node is still the elected leader. + try { + final boolean stillLeader = verifyLeader(); + failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0. + + if (!stillLeader) { + logger.info("According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName); + break; + } + } catch (final Exception e) { + failureCount++; + if (failureCount > 1) { + logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' " + + "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e); + } else { + logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' " + + "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e); + } + } + } } } finally { leader = false;