NIFI-2566: Refactored to allow just the Leader Election Manager to be responsible for determining who is the Cluster Coordinator
NIFI-2566: Removed storage of cluster roles from heartbeats and NodeConnectionStatus; use LeaderElectionManager to determine roles instead NIFI-2566: Updated Heartbeats so that if a node is out-of-sync with cluster topology, cluster coordinator will provide updated information back to the nodes NIFI-2566: Fixed issue that prevented standalone instance from starting by creating a standalone-instance version of the Leader Election Manager. Also added Controller Service enabled/disabled state to fingerprint rather than attempting to update the state when joining the cluster, as the implementation was incorrect and the correct implementation will be a rather significant effort that doesn't have to happen for 1.0.0 release This closes #866 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e42ea9ad Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e42ea9ad Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e42ea9ad Branch: refs/heads/master Commit: e42ea9ad457c5b5a1d2da5fa3d3494aacb0cb8d4 Parents: b3f3648 Author: Mark Payne <[email protected]> Authored: Sat Aug 13 19:38:07 2016 -0400 Committer: jpercivall <[email protected]> Committed: Tue Aug 16 20:39:04 2016 -0400 ---------------------------------------------------------------------- .../coordination/ClusterCoordinator.java | 44 ++- .../heartbeat/HeartbeatMonitor.java | 5 + .../coordination/heartbeat/NodeHeartbeat.java | 7 - .../cluster/coordination/node/ClusterRoles.java | 10 + .../coordination/node/NodeConnectionStatus.java | 36 +- .../protocol/AbstractNodeProtocolSender.java | 24 +- .../apache/nifi/cluster/protocol/Heartbeat.java | 11 +- .../nifi/cluster/protocol/HeartbeatPayload.java | 128 ++++++++ .../cluster/protocol/NodeProtocolSender.java | 5 +- .../impl/NodeProtocolSenderListener.java | 5 +- .../protocol/jaxb/message/AdaptedHeartbeat.java | 11 - .../message/AdaptedNodeConnectionStatus.java | 11 - .../protocol/jaxb/message/HeartbeatAdapter.java | 5 +- .../message/NodeConnectionStatusAdapter.java | 4 +- .../protocol/jaxb/message/ObjectFactory.java | 4 + .../message/HeartbeatResponseMessage.java | 45 +++ .../protocol/message/ProtocolMessage.java | 1 + .../resources/nifi-cluster-protocol-context.xml | 7 + .../jaxb/message/TestJaxbProtocolUtils.java | 28 ++ .../heartbeat/AbstractHeartbeatMonitor.java | 2 - .../ClusterProtocolHeartbeatMonitor.java | 137 +++----- .../heartbeat/StandardNodeHeartbeat.java | 18 +- .../node/CuratorNodeProtocolSender.java | 124 ------- .../node/LeaderElectionNodeProtocolSender.java | 80 +++++ .../node/NodeClusterCoordinator.java | 325 +++++------------- .../NodeClusterCoordinatorFactoryBean.java | 4 +- .../resources/nifi-cluster-manager-context.xml | 6 + .../heartbeat/TestAbstractHeartbeatMonitor.java | 31 +- .../TestThreadPoolRequestReplicator.java | 5 +- .../node/TestNodeClusterCoordinator.java | 105 ++---- .../nifi/cluster/integration/Cluster.java | 34 +- .../integration/ClusterConnectionIT.java | 327 +++++++++---------- .../nifi/cluster/integration/ClusterUtils.java | 10 +- .../apache/nifi/cluster/integration/Node.java | 51 ++- .../apache/nifi/cluster/HeartbeatPayload.java | 118 ------- .../apache/nifi/controller/FlowController.java | 46 ++- .../nifi/controller/StandardFlowService.java | 5 +- .../controller/StandardFlowSynchronizer.java | 83 ++--- .../cluster/ClusterProtocolHeartbeater.java | 107 +++--- .../election/CuratorLeaderElectionManager.java | 66 +++- .../leader/election/LeaderElectionManager.java | 24 +- .../StandaloneLeaderElectionManager.java | 65 ++++ .../nifi/fingerprint/FingerprintFactory.java | 1 + .../nifi/spring/FlowControllerFactoryBean.java | 14 +- .../LeaderElectionManagerFactoryBean.java | 57 ++++ .../src/main/resources/nifi-context.xml | 1 + .../nifi/cluster/HeartbeatPayloadTest.java | 1 + .../java/org/apache/nifi/nar/NarCloseable.java | 29 ++ .../nifi/web/StandardNiFiServiceFacade.java | 42 ++- .../src/main/resources/nifi-web-api-context.xml | 1 + 50 files changed, 1170 insertions(+), 1140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java index 4894fc5..49c6142 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java @@ -107,13 +107,20 @@ public interface ClusterCoordinator { Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState... states); /** - * Returns a Map of NodeConnectionStatus to all Node Identifiers that have that status. + * Returns a Map of NodeConnectionStates to all Node Identifiers that have that state. * - * @return the NodeConnectionStatus for each Node in the cluster, grouped by the Connection Status + * @return the NodeConnectionState for each Node in the cluster, grouped by the Connection State */ Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates(); /** + * Returns a List of the NodeConnectionStatus for each node in the cluster + * + * @return a List of the NodeConnectionStatus for each node in the cluster + */ + List<NodeConnectionStatus> getConnectionStatuses(); + + /** * Checks if the given hostname is blocked by the configured firewall, returning * <code>true</code> if the node is blocked, <code>false</code> if the node is * allowed through the firewall or if there is no firewall configured @@ -135,14 +142,6 @@ public interface ClusterCoordinator { void reportEvent(NodeIdentifier nodeId, Severity severity, String event); /** - * Updates the roles held by the given node - * - * @param nodeId the id of the node to update - * @param roles the new roles that the node possesses - */ - void updateNodeRoles(NodeIdentifier nodeId, Set<String> roles); - - /** * Returns the NodeIdentifier that exists that has the given UUID, or <code>null</code> if no NodeIdentifier * exists for the given UUID * @@ -197,6 +196,17 @@ public interface ClusterCoordinator { void resetNodeStatuses(Map<NodeIdentifier, NodeConnectionStatus> statusMap); /** + * Resets the status of the node to be in accordance with the given NodeConnectionStatus if and only if the + * currently held status for this node has an Update ID equal to the given <code>qualifyingUpdateId</code> + * + * @param connectionStatus the new status of the node + * @param qualifyingUpdateId the Update ID to compare the current ID with. If the current ID for the node described by the provided + * NodeConnectionStatus is not equal to this value, the value will not be updated + * @return <code>true</code> if the node status was updated, <code>false</code> if the <code>qualifyingUpdateId</code> is out of date. + */ + boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId); + + /** * Notifies the Cluster Coordinator of the Node Identifier that the coordinator is currently running on * * @param nodeId the ID of the current node @@ -216,18 +226,4 @@ public interface ClusterCoordinator { * @return <code>true</code> if connected, <code>false</code> otherwise */ boolean isConnected(); - - /** - * Notifies the cluster coordinator that this node has been granted the given role - * - * @param clusterRole the role that this node has been granted - */ - void addRole(String clusterRole); - - /** - * Notifies the cluster coordinator that this node is no longer responsible for the given role - * - * @param clusterRole the role that this node is no longer responsible for - */ - void removeRole(String clusterRole); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java index c151382..988ba75 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java @@ -52,4 +52,9 @@ public interface HeartbeatMonitor { * @param nodeId the id of the node whose heartbeat should be removed */ void removeHeartbeat(NodeIdentifier nodeId); + + /** + * @return the address that heartbeats should be sent to when this node is elected coordinator. + */ + String getHeartbeatAddress(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java index 2ddda79..c4413a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java @@ -17,8 +17,6 @@ package org.apache.nifi.cluster.coordination.heartbeat; -import java.util.Set; - import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -40,11 +38,6 @@ public interface NodeHeartbeat { NodeConnectionStatus getConnectionStatus(); /** - * @return the set of Roles that the node currently possesses. - */ - Set<String> getRoles(); - - /** * @return the number of FlowFiles that are queued up on the node */ int getFlowFileCount(); http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java index 611faa4..1425c9d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterRoles.java @@ -17,9 +17,19 @@ package org.apache.nifi.cluster.coordination.node; +import java.util.HashSet; +import java.util.Set; + public class ClusterRoles { public static final String PRIMARY_NODE = "Primary Node"; public static final String CLUSTER_COORDINATOR = "Cluster Coordinator"; + + public static Set<String> getAllRoles() { + final Set<String> roles = new HashSet<>(); + roles.add(PRIMARY_NODE); + roles.add(CLUSTER_COORDINATOR); + return roles; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java index 23e509d..34bd127 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java @@ -17,10 +17,7 @@ package org.apache.nifi.cluster.coordination.node; -import java.util.Collections; -import java.util.HashSet; import java.util.Objects; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; @@ -41,40 +38,38 @@ public class NodeConnectionStatus { private final DisconnectionCode disconnectCode; private final String disconnectReason; private final Long connectionRequestTime; - private final Set<String> roles; - public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final Set<String> roles) { - this(nodeId, state, null, null, null, roles); + public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state) { + this(nodeId, state, null, null, null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) { - this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.name(), null, null); + this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionCode.toString(), null); } public NodeConnectionStatus(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String disconnectionExplanation) { - this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null, null); + this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, disconnectionExplanation, null); } - public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectionCode, final Set<String> roles) { - this(nodeId, state, disconnectionCode, disconnectionCode.name(), null, roles); + public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectionCode) { + this(nodeId, state, disconnectionCode, disconnectionCode == null ? null : disconnectionCode.toString(), null); } - public NodeConnectionStatus(final NodeConnectionStatus status, final Set<String> roles) { - this(status.getNodeIdentifier(), status.getState(), status.getDisconnectCode(), status.getDisconnectReason(), status.getConnectionRequestTime(), roles); + public NodeConnectionStatus(final NodeConnectionStatus status) { + this(status.getNodeIdentifier(), status.getState(), status.getDisconnectCode(), status.getDisconnectReason(), status.getConnectionRequestTime()); } public NodeConnectionStatus(final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode, - final String disconnectReason, final Long connectionRequestTime, final Set<String> roles) { - this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, disconnectReason, connectionRequestTime, roles); + final String disconnectReason, final Long connectionRequestTime) { + this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, disconnectReason, connectionRequestTime); } public NodeConnectionStatus(final long updateId, final NodeIdentifier nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode, - final String disconnectReason, final Long connectionRequestTime, final Set<String> roles) { + final String disconnectReason, final Long connectionRequestTime) { this.updateId = updateId; this.nodeId = nodeId; this.state = state; - this.roles = roles == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(roles)); if (state == NodeConnectionState.DISCONNECTED && disconnectCode == null) { this.disconnectCode = DisconnectionCode.UNKNOWN; this.disconnectReason = this.disconnectCode.toString(); @@ -90,10 +85,6 @@ public class NodeConnectionStatus { return updateId; } - public Set<String> getRoles() { - return roles; - } - public NodeIdentifier getNodeIdentifier() { return nodeId; } @@ -118,11 +109,10 @@ public class NodeConnectionStatus { public String toString() { final StringBuilder sb = new StringBuilder(); final NodeConnectionState state = getState(); - sb.append("NodeConnectionStatus[state=").append(state); + sb.append("NodeConnectionStatus[nodeId=").append(nodeId).append(", state=").append(state); if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) { sb.append(", Disconnect Code=").append(getDisconnectCode()).append(", Disconnect Reason=").append(getDisconnectReason()); } - sb.append(", roles=").append(getRoles()); sb.append(", updateId=").append(getUpdateIdentifier()); sb.append("]"); return sb.toString(); @@ -142,7 +132,6 @@ public class NodeConnectionStatus { final int prime = 31; int result = 1; result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); - result = prime * result + ((roles == null) ? 0 : roles.hashCode()); result = prime * result + ((state == null) ? 0 : state.hashCode()); return result; } @@ -163,7 +152,6 @@ public class NodeConnectionStatus { NodeConnectionStatus other = (NodeConnectionStatus) obj; return Objects.deepEquals(getNodeIdentifier(), other.getNodeIdentifier()) - && Objects.deepEquals(getRoles(), other.getRoles()) && Objects.deepEquals(getState(), other.getState()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java index 4131dc5..22d6ebc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java @@ -24,6 +24,7 @@ import java.net.Socket; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.io.socket.SocketConfiguration; @@ -74,7 +75,7 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { } @Override - public void heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException { + public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException { final String hostname; final int port; try { @@ -85,7 +86,12 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format"); } - sendProtocolMessage(msg, hostname, port); + final ProtocolMessage responseMessage = sendProtocolMessage(msg, hostname, port); + if (MessageType.HEARTBEAT_RESPONSE == responseMessage.getType()) { + return (HeartbeatResponseMessage) responseMessage; + } + + throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'"); } @@ -108,7 +114,7 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { return socketConfiguration; } - private void sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) { + private ProtocolMessage sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) { Socket socket = null; try { try { @@ -124,6 +130,18 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender { } catch (final IOException ioe) { throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); } + + final ProtocolMessage response; + try { + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); + response = unmarshaller.unmarshal(socket.getInputStream()); + } catch (final IOException ioe) { + throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message from " + + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe); + } + + return response; } finally { SocketUtils.closeQuietly(socket); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java index 2135f20..a028c8e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java @@ -16,10 +16,7 @@ */ package org.apache.nifi.cluster.protocol; -import java.util.Collections; import java.util.Date; -import java.util.HashSet; -import java.util.Set; import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; @@ -35,17 +32,15 @@ import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter; public class Heartbeat { private final NodeIdentifier nodeIdentifier; - private final Set<String> roles; private final NodeConnectionStatus connectionStatus; private final long createdTimestamp; private final byte[] payload; - public Heartbeat(final NodeIdentifier nodeIdentifier, final Set<String> roles, final NodeConnectionStatus connectionStatus, final byte[] payload) { + public Heartbeat(final NodeIdentifier nodeIdentifier, final NodeConnectionStatus connectionStatus, final byte[] payload) { if (nodeIdentifier == null) { throw new IllegalArgumentException("Node Identifier may not be null."); } this.nodeIdentifier = nodeIdentifier; - this.roles = roles == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(roles)); this.connectionStatus = connectionStatus; this.payload = payload; this.createdTimestamp = new Date().getTime(); @@ -59,10 +54,6 @@ public class Heartbeat { return payload; } - public Set<String> getRoles() { - return roles; - } - public NodeConnectionStatus getConnectionStatus() { return connectionStatus; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java new file mode 100644 index 0000000..8363a20 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/HeartbeatPayload.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; + +/** + * The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node. + * + */ +@XmlRootElement +public class HeartbeatPayload { + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext."); + } + } + + private int activeThreadCount; + private long totalFlowFileCount; + private long totalFlowFileBytes; + private long systemStartTime; + private List<NodeConnectionStatus> clusterStatus; + + public int getActiveThreadCount() { + return activeThreadCount; + } + + public void setActiveThreadCount(final int activeThreadCount) { + this.activeThreadCount = activeThreadCount; + } + + public long getTotalFlowFileCount() { + return totalFlowFileCount; + } + + public void setTotalFlowFileCount(final long totalFlowFileCount) { + this.totalFlowFileCount = totalFlowFileCount; + } + + public long getTotalFlowFileBytes() { + return totalFlowFileBytes; + } + + public void setTotalFlowFileBytes(final long totalFlowFileBytes) { + this.totalFlowFileBytes = totalFlowFileBytes; + } + + public long getSystemStartTime() { + return systemStartTime; + } + + public void setSystemStartTime(final long systemStartTime) { + this.systemStartTime = systemStartTime; + } + + public List<NodeConnectionStatus> getClusterStatus() { + return clusterStatus; + } + + public void setClusterStatus(final List<NodeConnectionStatus> clusterStatus) { + this.clusterStatus = clusterStatus; + } + + public byte[] marshal() throws ProtocolException { + final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream(); + marshal(this, payloadBytes); + return payloadBytes.toByteArray(); + } + + public static void marshal(final HeartbeatPayload payload, final OutputStream os) throws ProtocolException { + try { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(payload, os); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } + + public static HeartbeatPayload unmarshal(final InputStream is) throws ProtocolException { + try { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (HeartbeatPayload) unmarshaller.unmarshal(is); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } + + public static HeartbeatPayload unmarshal(final byte[] bytes) throws ProtocolException { + try { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (HeartbeatPayload) unmarshaller.unmarshal(new ByteArrayInputStream(bytes)); + } catch (final JAXBException je) { + throw new ProtocolException(je); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java index 432a03d..fcf5195 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java @@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; /** * An interface for sending protocol messages from a node to the cluster @@ -44,6 +45,8 @@ public interface NodeProtocolSender { * @param msg the heartbeat message to send * @param address the address of the Cluster Coordinator in <hostname>:<port> format * @throws ProtocolException if unable to send the heartbeat + * + * @return the response from the Cluster Coordinator */ - void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException; + HeartbeatResponseMessage heartbeat(HeartbeatMessage msg, String address) throws ProtocolException; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java index 0fd2517..1b0aeea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java @@ -27,6 +27,7 @@ import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.reporting.BulletinRepository; public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener { @@ -92,7 +93,7 @@ public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolL } @Override - public void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException { - sender.heartbeat(msg, address); + public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException { + return sender.heartbeat(msg, address); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java index f1eba52..6f718fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.cluster.protocol.jaxb.message; -import java.util.Set; - import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -29,7 +27,6 @@ public class AdaptedHeartbeat { private NodeIdentifier nodeIdentifier; private byte[] payload; - private Set<String> roles; private NodeConnectionStatus connectionStatus; public AdaptedHeartbeat() { @@ -44,14 +41,6 @@ public class AdaptedHeartbeat { this.nodeIdentifier = nodeIdentifier; } - public Set<String> getRoles() { - return roles; - } - - public void setRoles(Set<String> roles) { - this.roles = roles; - } - public void setConnectionStatus(NodeConnectionStatus connectionStatus) { this.connectionStatus = connectionStatus; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java index 9cfac2c..c8c4acf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java @@ -17,8 +17,6 @@ package org.apache.nifi.cluster.protocol.jaxb.message; -import java.util.Set; - import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.protocol.NodeIdentifier; @@ -30,7 +28,6 @@ public class AdaptedNodeConnectionStatus { private DisconnectionCode disconnectCode; private String disconnectReason; private Long connectionRequestTime; - private Set<String> roles; public Long getUpdateId() { return updateId; @@ -79,12 +76,4 @@ public class AdaptedNodeConnectionStatus { public void setConnectionRequestTime(Long connectionRequestTime) { this.connectionRequestTime = connectionRequestTime; } - - public Set<String> getRoles() { - return roles; - } - - public void setRoles(Set<String> roles) { - this.roles = roles; - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java index 94d26ce..58e675b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java @@ -34,9 +34,6 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> { // set payload aHb.setPayload(hb.getPayload()); - // set leader flag - aHb.setRoles(hb.getRoles()); - // set connected flag aHb.setConnectionStatus(hb.getConnectionStatus()); } @@ -46,7 +43,7 @@ public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> { @Override public Heartbeat unmarshal(final AdaptedHeartbeat aHb) { - return new Heartbeat(aHb.getNodeIdentifier(), aHb.getRoles(), aHb.getConnectionStatus(), aHb.getPayload()); + return new Heartbeat(aHb.getNodeIdentifier(), aHb.getConnectionStatus(), aHb.getPayload()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java index 21d0bda..ec209de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java @@ -30,8 +30,7 @@ public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectio adapted.getState(), adapted.getDisconnectCode(), adapted.getDisconnectReason(), - adapted.getConnectionRequestTime(), - adapted.getRoles()); + adapted.getConnectionRequestTime()); } @Override @@ -44,7 +43,6 @@ public class NodeConnectionStatusAdapter extends XmlAdapter<AdaptedNodeConnectio adapted.setDisconnectCode(toAdapt.getDisconnectCode()); adapted.setDisconnectReason(toAdapt.getDisconnectReason()); adapted.setState(toAdapt.getState()); - adapted.setRoles(toAdapt.getRoles()); } return adapted; } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java index da13b02..afa87b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java @@ -24,6 +24,7 @@ import org.apache.nifi.cluster.protocol.message.DisconnectMessage; import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; @@ -97,4 +98,7 @@ public class ObjectFactory { return new NodeConnectionStatusResponseMessage(); } + public HeartbeatResponseMessage createHeartbeatResponse() { + return new HeartbeatResponseMessage(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java new file mode 100644 index 0000000..cbb8b48 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.protocol.message; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; + +@XmlRootElement(name = "heartbeatResponse") +public class HeartbeatResponseMessage extends ProtocolMessage { + + private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>(); + + + @Override + public MessageType getType() { + return MessageType.HEARTBEAT_RESPONSE; + } + + public List<NodeConnectionStatus> getUpdatedNodeStatuses() { + return updatedNodeStatuses; + } + + public void setUpdatedNodeStatuses(final List<NodeConnectionStatus> nodeStatuses) { + this.updatedNodeStatuses = new ArrayList<>(nodeStatuses); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java index 2e74689..1d0d115 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java @@ -32,6 +32,7 @@ public abstract class ProtocolMessage { RECONNECTION_RESPONSE, SERVICE_BROADCAST, HEARTBEAT, + HEARTBEAT_RESPONSE, NODE_CONNECTION_STATUS_REQUEST, NODE_CONNECTION_STATUS_RESPONSE, NODE_STATUS_CHANGE; http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml index 04332a5..032081b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/resources/nifi-cluster-protocol-context.xml @@ -54,11 +54,18 @@ </bean> <!-- node protocol sender --> + <!-- <bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.CuratorNodeProtocolSender"> <constructor-arg ref="protocolSocketConfiguration"/> <constructor-arg ref="protocolContext"/> <constructor-arg ref="nifiProperties"/> </bean> + --> + <bean id="nodeProtocolSender" class="org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender"> + <constructor-arg ref="protocolSocketConfiguration"/> + <constructor-arg ref="protocolContext"/> + <constructor-arg ref="leaderElectionManager"/> + </bean> <!-- protocol listener --> <bean id="protocolListener" class="org.apache.nifi.cluster.protocol.impl.SocketProtocolListener"> http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java index 21636a4..4fa53e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java @@ -32,9 +32,12 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.DataFlow; +import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.HeartbeatPayload; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage; import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage; import org.apache.nifi.web.Revision; @@ -96,4 +99,29 @@ public class TestJaxbProtocolUtils { final NodeConnectionStatus unmarshalledStatus = unmarshalledMsg.getNodeConnectionStatus(); assertEquals(nodeStatus, unmarshalledStatus); } + + @Test + public void testRoundTripHeartbeat() throws JAXBException { + final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, true); + final NodeConnectionStatus nodeStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); + + final HeartbeatPayload payload = new HeartbeatPayload(); + payload.setActiveThreadCount(1); + payload.setSystemStartTime(System.currentTimeMillis()); + payload.setTotalFlowFileBytes(83L); + payload.setTotalFlowFileCount(4); + + final List<NodeConnectionStatus> clusterStatus = Collections.singletonList(nodeStatus); + payload.setClusterStatus(clusterStatus); + + final Heartbeat heartbeat = new Heartbeat(nodeId, nodeStatus, payload.marshal()); + + final HeartbeatMessage msg = new HeartbeatMessage(); + msg.setHeartbeat(heartbeat); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos); + final Object unmarshalled = JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new ByteArrayInputStream(baos.toByteArray())); + assertTrue(unmarshalled instanceof HeartbeatMessage); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index c216ed3..e304c23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -256,8 +256,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { clusterCoordinator.finishNodeConnection(nodeId); clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected."); } - - clusterCoordinator.updateNodeRoles(nodeId, heartbeat.getRoles()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index d2d81d1..b955bd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -17,39 +17,34 @@ package org.apache.nifi.cluster.coordination.heartbeat; -import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.xml.bind.JAXBContext; import javax.xml.bind.Unmarshaller; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryForever; -import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.HeartbeatPayload; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.util.NiFiProperties; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.KeeperException.NodeExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +54,6 @@ import org.slf4j.LoggerFactory; */ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler { protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class); - private static final String COORDINATOR_ZNODE_NAME = "coordinator"; - - private final ZooKeeperClientConfig zkClientConfig; - private final String clusterNodesPath; - - private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>(); - private volatile CuratorFramework curatorClient; private final String heartbeatAddress; private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>(); @@ -86,8 +74,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im super(clusterCoordinator, properties); protocolListener.addHandler(this); - this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties); - this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes"); String hostname = properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS); if (hostname == null || hostname.trim().isEmpty()) { @@ -111,17 +97,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im } @Override - public void onStart() { - final RetryPolicy retryPolicy = new RetryForever(5000); - curatorClient = CuratorFrameworkFactory.builder() - .connectString(zkClientConfig.getConnectString()) - .sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis()) - .connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis()) - .retryPolicy(retryPolicy) - .defaultData(new byte[0]) - .build(); - curatorClient.start(); + public String getHeartbeatAddress() { + return heartbeatAddress; + } + @Override + public void onStart() { // We don't know what the heartbeats look like for the nodes, since we were just elected to monitoring // them. However, the map may be filled with old heartbeats. So we clear the heartbeats and populate the // map with new heartbeats set to the current time and using the currently known status. We do this so @@ -130,58 +111,13 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im heartbeatMessages.clear(); for (final NodeIdentifier nodeId : clusterCoordinator.getNodeIdentifiers()) { final NodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - clusterCoordinator.getConnectionStatus(nodeId), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()); + clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis()); heartbeatMessages.put(nodeId, heartbeat); } - - final Thread publishAddress = new Thread(new Runnable() { - @Override - public void run() { - while (!isStopped()) { - final String path = clusterNodesPath + "/" + COORDINATOR_ZNODE_NAME; - try { - try { - curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); - logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress); - return; - } catch (final NoNodeException nne) { - // ensure that parents are created, using a wide-open ACL because the parents contain no data - // and the path is not in any way sensitive. - try { - curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); - } catch (final NodeExistsException nee) { - // This is okay. Node already exists. - } - - curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8)); - logger.info("Successfully published address as heartbeat monitor address at path {} with value {}", path, heartbeatAddress); - - return; - } - } catch (Exception e) { - logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry."); - - try { - Thread.sleep(2000L); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - return; - } - } - } - } - }); - - publishAddress.setName("Publish Heartbeat Address"); - publishAddress.setDaemon(true); - publishAddress.start(); } @Override public void onStop() { - if (curatorClient != null) { - curatorClient.close(); - } } @Override @@ -195,10 +131,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im heartbeatMessages.remove(nodeId); } - protected Set<NodeIdentifier> getClusterNodeIds() { - return new HashSet<>(clusterNodeIds.values()); - } - @Override public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { @@ -211,7 +143,6 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im final NodeIdentifier nodeId = heartbeat.getNodeIdentifier(); final NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus(); - final Set<String> roles = heartbeat.getRoles(); final byte[] payloadBytes = heartbeat.getPayload(); final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes); final int activeThreadCount = payload.getActiveThreadCount(); @@ -220,11 +151,49 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im final long systemStartTime = payload.getSystemStartTime(); final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), - connectionStatus, roles, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); + connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime); heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat); logger.debug("Received new heartbeat from {}", nodeId); - return null; + // Formulate a List of differences between our view of the cluster topology and the node's view + // and send that back to the node so that it is in-sync with us + final List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus(); + final List<NodeConnectionStatus> updatedStatuses = getUpdatedStatuses(nodeStatusList); + + final HeartbeatResponseMessage responseMessage = new HeartbeatResponseMessage(); + responseMessage.setUpdatedNodeStatuses(updatedStatuses); + return responseMessage; + } + + + private List<NodeConnectionStatus> getUpdatedStatuses(final List<NodeConnectionStatus> nodeStatusList) { + // Map node's statuses by NodeIdentifier for quick & easy lookup + final Map<NodeIdentifier, NodeConnectionStatus> nodeStatusMap = nodeStatusList.stream() + .collect(Collectors.toMap(status -> status.getNodeIdentifier(), Function.identity())); + + // Check if our connection status is the same for each Node Identifier and if not, add our version of the status + // to a List of updated statuses. + final List<NodeConnectionStatus> currentStatuses = clusterCoordinator.getConnectionStatuses(); + final List<NodeConnectionStatus> updatedStatuses = new ArrayList<>(); + for (final NodeConnectionStatus currentStatus : currentStatuses) { + final NodeConnectionStatus nodeStatus = nodeStatusMap.get(currentStatus.getNodeIdentifier()); + if (!currentStatus.equals(nodeStatus)) { + updatedStatuses.add(currentStatus); + } + } + + // If the node has any statuses that we do not have, add a REMOVED status to the update list + final Set<NodeIdentifier> nodeIds = currentStatuses.stream().map(status -> status.getNodeIdentifier()).collect(Collectors.toSet()); + for (final NodeConnectionStatus nodeStatus : nodeStatusList) { + if (!nodeIds.contains(nodeStatus.getNodeIdentifier())) { + updatedStatuses.add(new NodeConnectionStatus(nodeStatus.getNodeIdentifier(), NodeConnectionState.REMOVED, null)); + } + } + + logger.debug("\n\nCalculated diff between current cluster status and node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n", + nodeStatusList, currentStatuses, updatedStatuses); + + return updatedStatuses; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java index e455a76..a63db44 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java @@ -17,13 +17,9 @@ package org.apache.nifi.cluster.coordination.heartbeat; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.protocol.Heartbeat; +import org.apache.nifi.cluster.protocol.HeartbeatPayload; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; @@ -32,18 +28,16 @@ public class StandardNodeHeartbeat implements NodeHeartbeat { private final NodeIdentifier nodeId; private final long timestamp; private final NodeConnectionStatus connectionStatus; - private final Set<String> roles; private final int flowFileCount; private final long flowFileBytes; private final int activeThreadCount; private final long systemStartTime; public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long timestamp, final NodeConnectionStatus connectionStatus, - final Set<String> roles, final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) { + final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) { this.timestamp = timestamp; this.nodeId = nodeId; this.connectionStatus = connectionStatus; - this.roles = roles == null ? Collections.emptySet() : Collections.unmodifiableSet(new HashSet<>(roles)); this.flowFileCount = flowFileCount; this.flowFileBytes = flowFileBytes; this.activeThreadCount = activeThreadCount; @@ -66,11 +60,6 @@ public class StandardNodeHeartbeat implements NodeHeartbeat { } @Override - public Set<String> getRoles() { - return roles; - } - - @Override public int getFlowFileCount() { return flowFileCount; } @@ -85,7 +74,6 @@ public class StandardNodeHeartbeat implements NodeHeartbeat { return activeThreadCount; } - @Override public long getSystemStartTime() { return systemStartTime; @@ -96,7 +84,7 @@ public class StandardNodeHeartbeat implements NodeHeartbeat { final HeartbeatPayload payload = HeartbeatPayload.unmarshal(heartbeat.getPayload()); return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), timestamp, heartbeat.getConnectionStatus(), - heartbeat.getRoles(), (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(), + (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(), payload.getActiveThreadCount(), payload.getSystemStartTime()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java deleted file mode 100644 index daa3e5c..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.cluster.coordination.node; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.Properties; - -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.RetryNTimes; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; -import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Uses Apache Curator to determine the address of the current cluster coordinator - */ -public class CuratorNodeProtocolSender extends AbstractNodeProtocolSender { - private static final Logger logger = LoggerFactory.getLogger(CuratorNodeProtocolSender.class); - - private final String coordinatorPath; - private final ZooKeeperClientConfig zkConfig; - private InetSocketAddress coordinatorAddress; - - - public CuratorNodeProtocolSender(final SocketConfiguration socketConfig, final ProtocolContext<ProtocolMessage> protocolContext, final Properties properties) { - super(socketConfig, protocolContext); - zkConfig = ZooKeeperClientConfig.createConfig(properties); - coordinatorPath = zkConfig.resolvePath("cluster/nodes/coordinator"); - } - - @Override - protected synchronized InetSocketAddress getServiceAddress() throws IOException { - if (coordinatorAddress != null) { - return coordinatorAddress; - } - - final RetryPolicy retryPolicy = new RetryNTimes(0, 0); - final CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(), - zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy); - curatorClient.start(); - - try { - // Get coordinator address and add watcher to change who we are heartbeating to if the value changes. - final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() { - @Override - public void process(final WatchedEvent event) { - coordinatorAddress = null; - } - }).forPath(coordinatorPath); - - if (coordinatorAddressBytes == null || coordinatorAddressBytes.length == 0) { - throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); - } - - final String address = new String(coordinatorAddressBytes, StandardCharsets.UTF_8); - - final String[] splits = address.split(":"); - if (splits.length != 2) { - final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " - + "that address is %s, but this is not in the expected format of <hostname>:<port>", address); - logger.error(message); - throw new ProtocolException(message); - } - - logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); - - final String hostname = splits[0]; - final int port; - try { - port = Integer.parseInt(splits[1]); - if (port < 1 || port > 65535) { - throw new NumberFormatException("Port must be in the range of 1 - 65535 but got " + port); - } - } catch (final NumberFormatException nfe) { - final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " - + "that address is %s, but the port is not a valid port number", address); - logger.error(message); - throw new ProtocolException(message); - } - - final InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port); - coordinatorAddress = socketAddress; - return socketAddress; - } catch (final NoNodeException nne) { - logger.info("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); - throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); - } catch (final NoClusterCoordinatorException ncce) { - throw ncce; - } catch (Exception e) { - throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e); - } finally { - curatorClient.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java new file mode 100644 index 0000000..03de329 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.node; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.controller.leader.election.LeaderElectionManager; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LeaderElectionNodeProtocolSender extends AbstractNodeProtocolSender { + private static final Logger logger = LoggerFactory.getLogger(LeaderElectionNodeProtocolSender.class); + + private final LeaderElectionManager electionManager; + + public LeaderElectionNodeProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext, final LeaderElectionManager electionManager) { + super(socketConfiguration, protocolContext); + this.electionManager = electionManager; + } + + @Override + protected InetSocketAddress getServiceAddress() throws IOException { + final String address = electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); + + if (StringUtils.isEmpty(address)) { + throw new NoClusterCoordinatorException("No node has yet been elected Cluster Coordinator. Cannot establish connection to cluster yet."); + } + + final String[] splits = address.split(":"); + if (splits.length != 2) { + final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " + + "that address is %s, but this is not in the expected format of <hostname>:<port>", address); + logger.error(message); + throw new ProtocolException(message); + } + + logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address); + + final String hostname = splits[0]; + final int port; + try { + port = Integer.parseInt(splits[1]); + if (port < 1 || port > 65535) { + throw new NumberFormatException("Port must be in the range of 1 - 65535 but got " + port); + } + } catch (final NumberFormatException nfe) { + final String message = String.format("Attempted to determine Cluster Coordinator address. Zookeeper indicates " + + "that address is %s, but the port is not a valid port number", address); + logger.error(message); + throw new ProtocolException(message); + } + + final InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(hostname, port); + return socketAddress; + } + +}
