Repository: nifi
Updated Branches:
  refs/heads/master b9ef0fb84 -> b73ba7f8d


NIFI-2999: When Cluster Coordinator changes, purge any old heartbeats so that 
we don't disconnect a node due to very old heartbeats

This closes #1210


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b73ba7f8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b73ba7f8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b73ba7f8

Branch: refs/heads/master
Commit: b73ba7f8d4f6319881c26b8faad121ceb12041ab
Parents: b9ef0fb
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Nov 11 11:11:36 2016 -0500
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Fri Nov 11 15:13:49 2016 -0500

----------------------------------------------------------------------
 .../nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java | 5 +++++
 .../heartbeat/ClusterProtocolHeartbeatMonitor.java            | 6 ++++++
 .../coordination/heartbeat/TestAbstractHeartbeatMonitor.java  | 5 +++++
 .../main/java/org/apache/nifi/controller/FlowController.java  | 7 +++++++
 4 files changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
index 988ba75..6a0937d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -54,6 +54,11 @@ public interface HeartbeatMonitor {
     void removeHeartbeat(NodeIdentifier nodeId);
 
     /**
+     * Clears all heartbeats that have been received
+     */
+    void purgeHeartbeats();
+
+    /**
      * @return the address that heartbeats should be sent to when this node is 
elected coordinator.
      */
     String getHeartbeatAddress();

http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 6a8e575..3e98368 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -133,6 +133,12 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     }
 
     @Override
+    public synchronized void purgeHeartbeats() {
+        logger.debug("Purging old heartbeats");
+        heartbeatMessages.clear();
+    }
+
+    @Override
     public ProtocolMessage handle(final ProtocolMessage msg) throws 
ProtocolException {
         switch (msg.getType()) {
             case HEARTBEAT:

http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 690cda8..6610231 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -369,6 +369,11 @@ public class TestAbstractHeartbeatMonitor {
             heartbeats.remove(nodeId);
         }
 
+        @Override
+        public synchronized void purgeHeartbeats() {
+            heartbeats.clear();
+        }
+
         void waitForProcessed() throws InterruptedException {
             synchronized (mutex) {
                 mutex.wait();

http://git-wip-us.apache.org/repos/asf/nifi/blob/b73ba7f8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ec8b288..b790526 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3350,6 +3350,13 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public synchronized void onLeaderElection() {
                 LOG.info("This node elected Active Cluster Coordinator");
+
+                // Purge any heartbeats that we already have. If we don't do 
this, we can have a scenario where we receive heartbeats
+                // from a node, and then another node becomes Cluster 
Coordinator. As a result, we stop receiving heartbeats. Now that
+                // we are again the Cluster Coordinator, we will detect that 
there are old heartbeat messages and start disconnecting
+                // nodes due to a lack of heartbeat. By purging the heartbeats 
here, we remove any old heartbeat messages so that this
+                // does not happen.
+                FlowController.this.heartbeatMonitor.purgeHeartbeats();
             }
         }, participantId);
     }

Reply via email to