Repository: nifi
Updated Branches:
  refs/heads/master cfc8a9613 -> 70d70732b


NIFI-2431: Before registering for the Cluster Coordinator role, check if 
another node already has this role. If so, do not register for this role until 
after the node has joined the cluster and inherited the flow.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bc0b5fe1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bc0b5fe1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bc0b5fe1

Branch: refs/heads/master
Commit: bc0b5fe154c8473220c5a119434e79ca279be2c3
Parents: cfc8a96
Author: Mark Payne <[email protected]>
Authored: Fri Jul 29 09:14:17 2016 -0400
Committer: joewitt <[email protected]>
Committed: Fri Jul 29 09:42:38 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  | 44 ++++++++++++++------
 .../nifi/controller/StandardFlowService.java    |  4 +-
 .../election/CuratorLeaderElectionManager.java  |  3 +-
 3 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bc0b5fe1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2649895..9ca8e30 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -571,7 +571,23 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         if (configuredForClustering) {
             leaderElectionManager = new CuratorLeaderElectionManager(4);
             heartbeater = new ClusterProtocolHeartbeater(protocolSender, 
properties);
-            registerForClusterCoordinator();
+
+            // Check if there is already a cluster coordinator elected. If 
not, go ahead
+            // and register for coordinator role. If there is already one 
elected, do not register until
+            // we have connected to the cluster. This allows us to avoid 
becoming the coordinator with a
+            // flow that is different from the rest of the cluster (especially 
an empty flow) and then
+            // kicking everyone out. This way, we instead inherit the cluster 
flow before we attempt to be
+            // the coordinator.
+            LOG.info("Checking if there is already a Cluster Coordinator 
Elected...");
+            final NodeIdentifier electedCoordinatorNodeId = 
clusterCoordinator.getElectedActiveCoordinatorNode();
+            if (electedCoordinatorNodeId == null) {
+                LOG.info("It appears that no Cluster Coordinator has been 
Elected yet. Registering for Cluster Coordinator Role.");
+                registerForClusterCoordinator();
+            } else {
+                LOG.info("The Elected Cluster Coordinator is {}. Will not 
register to be elected for this role until after connecting "
+                    + "to the cluster and inheriting the cluster's flow.", 
electedCoordinatorNodeId);
+            }
+
             leaderElectionManager.start();
         } else {
             leaderElectionManager = null;
@@ -3279,6 +3295,20 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         });
     }
 
+    private void registerForPrimaryNode() {
+        leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new 
LeaderElectionStateChangeListener() {
+            @Override
+            public void onLeaderElection() {
+                setPrimary(true);
+            }
+
+            @Override
+            public void onLeaderRelinquish() {
+                setPrimary(false);
+            }
+        });
+    }
+
     /**
      * Sets whether this instance is clustered. Clustered means that a node is 
either connected or trying to connect to the cluster.
      *
@@ -3315,17 +3345,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // update the bulletin repository
             if (isChanging) {
                 if (clustered) {
-                    leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, 
new LeaderElectionStateChangeListener() {
-                        @Override
-                        public void onLeaderElection() {
-                            setPrimary(true);
-                        }
-
-                        @Override
-                        public void onLeaderRelinquish() {
-                            setPrimary(false);
-                        }
-                    });
+                    registerForPrimaryNode();
 
                     // Participate in Leader Election for Heartbeat Monitor. 
Start the heartbeat monitor
                     // if/when we become leader and stop it when we lose 
leader role

http://git-wip-us.apache.org/repos/asf/nifi/blob/bc0b5fe1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 71d66b5..1b26e40 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -772,12 +772,12 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 try {
                     response = 
senderListener.requestConnection(requestMsg).getConnectionResponse();
                     if (response.getRejectionReason() != null) {
-                        logger.warn("Connection request was blocked by cluster 
manager with the explanation: " + response.getRejectionReason());
+                        logger.warn("Connection request was blocked by cluster 
coordinator with the explanation: " + response.getRejectionReason());
                         // set response to null and treat a firewall blockage 
the same as getting no response from manager
                         response = null;
                         break;
                     } else if (response.shouldTryLater()) {
-                        logger.info("Flow controller requested by cluster 
manager to retry connection in " + response.getTryLaterSeconds() + " seconds.");
+                        logger.info("Flow controller requested by cluster 
coordinator to retry connection in " + response.getTryLaterSeconds() + " 
seconds.");
                         try {
                             Thread.sleep(response.getTryLaterSeconds() * 1000);
                         } catch (final InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bc0b5fe1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index bea643f..19fa225 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -90,7 +90,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         logger.debug("{} Registering new Leader Selector for role {}", this, 
roleName);
 
         if (leaderRoles.containsKey(roleName)) {
-            logger.warn("{} Attempted to register Leader Election for role 
'{}' but this role is already registered", this, roleName);
+            logger.info("{} Attempted to register Leader Election for role 
'{}' but this role is already registered", this, roleName);
             return;
         }
 
@@ -130,6 +130,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         }
 
         leaderSelector.close();
+        logger.info("This node is no longer registered to be elected as the 
Leader for Role '{}'", roleName);
     }
 
     @Override

Reply via email to