Repository: nifi Updated Branches: refs/heads/master b9ef0fb84 -> b73ba7f8d
NIFI-2999: When Cluster Coordinator changes, purge any old heartbeats so that we don't disconnect a node due to very old heartbeats This closes #1210 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b73ba7f8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b73ba7f8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b73ba7f8 Branch: refs/heads/master Commit: b73ba7f8d4f6319881c26b8faad121ceb12041ab Parents: b9ef0fb Author: Mark Payne <marka...@hotmail.com> Authored: Fri Nov 11 11:11:36 2016 -0500 Committer: Oleg Zhurakousky <o...@suitcase.io> Committed: Fri Nov 11 15:13:49 2016 -0500 ---------------------------------------------------------------------- .../nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java | 5 +++++ .../heartbeat/ClusterProtocolHeartbeatMonitor.java | 6 ++++++ .../coordination/heartbeat/TestAbstractHeartbeatMonitor.java | 5 +++++ .../main/java/org/apache/nifi/controller/FlowController.java | 7 +++++++ 4 files changed, 23 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java index 988ba75..6a0937d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java @@ -54,6 +54,11 @@ public interface HeartbeatMonitor { void removeHeartbeat(NodeIdentifier nodeId); /** + * Clears all heartbeats that have been received + */ + void purgeHeartbeats(); + + /** * @return the address that heartbeats should be sent to when this node is elected coordinator. */ String getHeartbeatAddress(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 6a8e575..3e98368 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -133,6 +133,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im } @Override + public synchronized void purgeHeartbeats() { + logger.debug("Purging old heartbeats"); + heartbeatMessages.clear(); + } + + @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { switch (msg.getType()) { case HEARTBEAT: http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 690cda8..6610231 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -369,6 +369,11 @@ public class TestAbstractHeartbeatMonitor { heartbeats.remove(nodeId); } + @Override + public synchronized void purgeHeartbeats() { + heartbeats.clear(); + } + void waitForProcessed() throws InterruptedException { synchronized (mutex) { mutex.wait(); http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ec8b288..b790526 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3350,6 +3350,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); + + // Purge any heartbeats that we already have. If we don't do this, we can have a scenario where we receive heartbeats + // from a node, and then another node becomes Cluster Coordinator. As a result, we stop receiving heartbeats. Now that + // we are again the Cluster Coordinator, we will detect that there are old heartbeat messages and start disconnecting + // nodes due to a lack of heartbeat. By purging the heartbeats here, we remove any old heartbeat messages so that this + // does not happen. + FlowController.this.heartbeatMonitor.purgeHeartbeats(); } }, participantId); }