http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml deleted file mode 100644 index bad5a29..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-framework-parent</artifactId> - <version>0.0.1-incubating-SNAPSHOT</version> - </parent> - <artifactId>framework-cluster-protocol</artifactId> - <packaging>jar</packaging> - <name>NiFi Framework Cluster Protocol</name> - <description>The messaging protocol for clustered NiFi</description> - <dependencies> - - <!-- application dependencies --> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-properties</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-logging-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-socket-utils</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-security</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>core-api</artifactId> - </dependency> - - <!-- spring dependencies --> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - </dependencies> -</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java deleted file mode 100644 index fa1547f..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.reporting.BulletinRepository; - -/** - * An interface for sending protocol messages from the cluster manager to nodes. - * - * @author unattributed - */ -public interface ClusterManagerProtocolSender { - - /** - * Sends a "flow request" message to a node. - * @param msg a message - * @return the response - * @throws ProtocolException if communication failed - */ - FlowResponseMessage requestFlow(FlowRequestMessage msg) throws ProtocolException; - - /** - * Sends a "reconnection request" message to a node. - * @param msg a message - * @return - * @throws ProtocolException if communication failed - */ - ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException; - - /** - * Sends a "disconnection request" message to a node. - * @param msg a message - * @throws ProtocolException if communication failed - */ - void disconnect(DisconnectMessage msg) throws ProtocolException; - - /** - * Sends an "assign primary role" message to a node. - * @param msg a message - * @throws ProtocolException if communication failed - */ - void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException; - - /** - * Sets the {@link BulletinRepository} that can be used to report bulletins - * @param bulletinRepository - */ - void setBulletinRepository(final BulletinRepository bulletinRepository); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java deleted file mode 100644 index 1b5d007..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter; - -/** - * A node's request to connect to the cluster. The request contains a proposed - * identifier. - * - * @author unattributed - */ -@XmlJavaTypeAdapter(ConnectionRequestAdapter.class) -public class ConnectionRequest { - - private final NodeIdentifier proposedNodeIdentifier; - - public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) { - if(proposedNodeIdentifier == null) { - throw new IllegalArgumentException("Proposed node identifier may not be null."); - } - this.proposedNodeIdentifier = proposedNodeIdentifier; - } - - public NodeIdentifier getProposedNodeIdentifier() { - return proposedNodeIdentifier; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java deleted file mode 100644 index 7a5ff2b..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter; - -/** - * The cluster manager's response to a node's connection request. If the manager - * has a current copy of the data flow, then it is returned with a node identifier - * to the node. Otherwise, the manager will provide a "try again in X seconds" - * response to the node in hopes that a current data flow will be available upon - * subsequent requests. - * - * @author unattributed - */ -@XmlJavaTypeAdapter(ConnectionResponseAdapter.class) -public class ConnectionResponse { - - private final boolean blockedByFirewall; - private final int tryLaterSeconds; - private final NodeIdentifier nodeIdentifier; - private final StandardDataFlow dataFlow; - private final boolean primary; - private final Integer managerRemoteInputPort; - private final Boolean managerRemoteCommsSecure; - private final String instanceId; - - private volatile String clusterManagerDN; - - public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary, - final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) { - if(nodeIdentifier == null) { - throw new IllegalArgumentException("Node identifier may not be empty or null."); - } else if(dataFlow == null) { - throw new IllegalArgumentException("DataFlow may not be null."); - } - this.nodeIdentifier = nodeIdentifier; - this.dataFlow = dataFlow; - this.tryLaterSeconds = 0; - this.blockedByFirewall = false; - this.primary = primary; - this.managerRemoteInputPort = managerRemoteInputPort; - this.managerRemoteCommsSecure = managerRemoteCommsSecure; - this.instanceId = instanceId; - } - - public ConnectionResponse(final int tryLaterSeconds) { - if(tryLaterSeconds <= 0) { - throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds); - } - this.dataFlow = null; - this.nodeIdentifier = null; - this.tryLaterSeconds = tryLaterSeconds; - this.blockedByFirewall = false; - this.primary = false; - this.managerRemoteInputPort = null; - this.managerRemoteCommsSecure = null; - this.instanceId = null; - } - - private ConnectionResponse() { - this.dataFlow = null; - this.nodeIdentifier = null; - this.tryLaterSeconds = 0; - this.blockedByFirewall = true; - this.primary = false; - this.managerRemoteInputPort = null; - this.managerRemoteCommsSecure = null; - this.instanceId = null; - } - - public static ConnectionResponse createBlockedByFirewallResponse() { - return new ConnectionResponse(); - } - - public boolean isPrimary() { - return primary; - } - - public boolean shouldTryLater() { - return tryLaterSeconds > 0; - } - - public boolean isBlockedByFirewall() { - return blockedByFirewall; - } - - public int getTryLaterSeconds() { - return tryLaterSeconds; - } - - public StandardDataFlow getDataFlow() { - return dataFlow; - } - - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public Integer getManagerRemoteInputPort() { - return managerRemoteInputPort; - } - - public Boolean isManagerRemoteCommsSecure() { - return managerRemoteCommsSecure; - } - - public String getInstanceId() { - return instanceId; - } - - public void setClusterManagerDN(final String dn) { - this.clusterManagerDN = dn; - } - - /** - * Returns the DN of the NCM, if it is available or <code>null</code> otherwise. - * - * @return - */ - public String getClusterManagerDN() { - return clusterManagerDN; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java deleted file mode 100644 index 67324a1..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.util.Date; -import javax.xml.bind.annotation.XmlTransient; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter; - -/** - * A heartbeat for indicating the status of a node to the cluster. - * @author unattributed - */ -@XmlJavaTypeAdapter(HeartbeatAdapter.class) -public class Heartbeat { - - private final NodeIdentifier nodeIdentifier; - private final boolean primary; - private final boolean connected; - private final long createdTimestamp; - private final byte[] payload; - - public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) { - if(nodeIdentifier == null) { - throw new IllegalArgumentException("Node Identifier may not be null."); - } - this.nodeIdentifier = nodeIdentifier; - this.primary = primary; - this.connected = connected; - this.payload = payload; - this.createdTimestamp = new Date().getTime(); - } - - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public byte[] getPayload() { - return payload; - } - - public boolean isPrimary() { - return primary; - } - - public boolean isConnected() { - return connected; - } - - @XmlTransient - public long getCreatedTimestamp() { - return createdTimestamp; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java deleted file mode 100644 index a120524..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.jaxb.message.NodeBulletinsAdapter; - -/** - * - */ -@XmlJavaTypeAdapter(NodeBulletinsAdapter.class) -public class NodeBulletins { - - private final NodeIdentifier nodeIdentifier; - private final byte[] payload; - - public NodeBulletins(NodeIdentifier nodeIdentifier, byte[] payload) { - this.nodeIdentifier = nodeIdentifier; - this.payload = payload; - } - - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public byte[] getPayload() { - return payload; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java deleted file mode 100644 index 1893186..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.commons.lang3.StringUtils; - -/** - * A node identifier denoting the coordinates of a flow controller that is connected - * to a cluster. Nodes provide an external public API interface and an internal private - * interface for communicating with the cluster. - * - * The external API interface and internal protocol each require an IP or hostname - * as well as a port for communicating. - * - * This class overrides hashCode and equals and considers two instances to be - * equal if they have the equal IDs. - * - * @author unattributed - * @Immutable - * @Threadsafe - */ -public class NodeIdentifier { - - /** the unique identifier for the node */ - private final String id; - - /** the IP or hostname to use for sending requests to the node's external interface */ - private final String apiAddress; - - /** the port to use use for sending requests to the node's external interface */ - private final int apiPort; - - /** the IP or hostname to use for sending requests to the node's internal interface */ - private final String socketAddress; - - /** the port to use use for sending requests to the node's internal interface */ - private final int socketPort; - - private final String nodeDn; - - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) { - this(id, apiAddress, apiPort, socketAddress, socketPort, null); - } - - public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) { - - if(StringUtils.isBlank(id)) { - throw new IllegalArgumentException("Node ID may not be empty or null."); - } else if(StringUtils.isBlank(apiAddress)) { - throw new IllegalArgumentException("Node API address may not be empty or null."); - } else if(StringUtils.isBlank(socketAddress)) { - throw new IllegalArgumentException("Node socket address may not be empty or null."); - } - - validatePort(apiPort); - validatePort(socketPort); - - this.id = id; - this.apiAddress = apiAddress; - this.apiPort = apiPort; - this.socketAddress = socketAddress; - this.socketPort = socketPort; - this.nodeDn = dn; - } - - public String getId() { - return id; - } - - public String getDN() { - return nodeDn; - } - - public String getApiAddress() { - return apiAddress; - } - - public int getApiPort() { - return apiPort; - } - - public String getSocketAddress() { - return socketAddress; - } - - public int getSocketPort() { - return socketPort; - } - - private void validatePort(final int port) { - if(port < 1 || port > 65535) { - throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port); - } - } - - /** - * Compares the id of two node identifiers for equality. - * - * @param obj a node identifier - * - * @return true if the id is equal; false otherwise - */ - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final NodeIdentifier other = (NodeIdentifier) obj; - if ((this.id == null) ? (other.id != null) : !this.id.equals(other.id)) { - return false; - } - return true; - } - - /** - * Compares API address/port and socket address/port for equality. The - * id is not used for comparison. - * - * @param other a node identifier - * - * @return true if API address/port and socket address/port are equal; false - * otherwise - */ - public boolean logicallyEquals(final NodeIdentifier other) { - if(other == null) { - return false; - } - if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) { - return false; - } - if(this.apiPort != other.apiPort) { - return false; - } - if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) { - return false; - } - if(this.socketPort != other.socketPort) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int hash = 7; - hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0); - return hash; - } - - @Override - public String toString() { - return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" + apiPort + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort + ']'; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java deleted file mode 100644 index 1edcb91..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; - -/** - * An interface for sending protocol messages from a node to the cluster manager. - * @author unattributed - */ -public interface NodeProtocolSender { - - /** - * Sends a "connection request" message to the cluster manager. - * @param msg a message - * @return the response - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a "heartbeat" message to the cluster manager. - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a bulletins message to the cluster manager. - * @param msg - * @throws ProtocolException - * @throws UnknownServiceAddressException - */ - void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a failure notification if the controller was unable start. - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - - /** - * Sends a failure notification if the node was unable to reconnect to the cluster - * @param msg a message - * @throws UnknownServiceAddressException if the cluster manager's address is not known - * @throws ProtocolException if communication failed - */ - void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java deleted file mode 100644 index b614e76..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * The context for communicating using the internal cluster protocol. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolContext<T> { - - /** - * Creates a marshaller for serializing protocol messages. - * @return a marshaller - */ - ProtocolMessageMarshaller<T> createMarshaller(); - - /** - * Creates an unmarshaller for deserializing protocol messages. - * @return a unmarshaller - */ - ProtocolMessageUnmarshaller<T> createUnmarshaller(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java deleted file mode 100644 index f11ad84..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * The base exception for problems encountered while communicating within the - * cluster. - * @author unattributed - */ -public class ProtocolException extends RuntimeException { - - public ProtocolException() { - } - - public ProtocolException(String msg) { - super(msg); - } - - public ProtocolException(Throwable cause) { - super(cause); - } - - public ProtocolException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java deleted file mode 100644 index 6de87db..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; - -/** - * A handler for processing protocol messages. - * @author unattributed - */ -public interface ProtocolHandler { - - /** - * Handles the given protocol message or throws an exception if it cannot - * handle the message. If no response is needed by the protocol, then null - * should be returned. - * - * @param msg a message - * @return a response or null, if no response is necessary - * - * @throws ProtocolException if the message could not be processed - */ - ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException; - - /** - * @param msg - * @return true if the handler can process the given message; false otherwise - */ - boolean canHandle(ProtocolMessage msg); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java deleted file mode 100644 index 32f0f5d..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.reporting.BulletinRepository; - -/** - * Defines the interface for a listener to process protocol messages. - * @author unattributed - */ -public interface ProtocolListener { - - /** - * Starts the instance for listening for messages. Start may only be called - * if the instance is not running. - * @throws java.io.IOException - */ - void start() throws IOException; - - /** - * Stops the instance from listening for messages. Stop may only be called - * if the instance is running. - * @throws java.io.IOException - */ - void stop() throws IOException; - - /** - * @return true if the instance is started; false otherwise. - */ - boolean isRunning(); - - /** - * @return the handlers registered with the listener - */ - Collection<ProtocolHandler> getHandlers(); - - /** - * Registers a handler with the listener. - * @param handler a handler - */ - void addHandler(ProtocolHandler handler); - - /** - * Sets the BulletinRepository that can be used to report bulletins - * @param bulletinRepository - */ - void setBulletinRepository(BulletinRepository bulletinRepository); - - /** - * Unregisters the handler with the listener. - * @param handler a handler - * @return true if the handler was removed; false otherwise - */ - boolean removeHandler(ProtocolHandler handler); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java deleted file mode 100644 index bb436e0..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Defines a marshaller for serializing protocol messages. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolMessageMarshaller<T> { - - /** - * Serializes the given message to the given output stream. - * @param msg a message - * @param os an output stream - * @throws IOException if the message could not be serialized to the stream - */ - void marshal(T msg, OutputStream os) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java deleted file mode 100644 index c690e7b..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Defines an unmarshaller for deserializing protocol messages. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public interface ProtocolMessageUnmarshaller<T> { - - /** - * Deserializes a message on the given input stream. - * @param is an input stream - * @return - * @throws IOException if the message could not be deserialized from the stream - */ - T unmarshal(InputStream is) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java deleted file mode 100644 index c2d16fc..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -import org.apache.nifi.cluster.protocol.DataFlow; -import java.io.Serializable; -import java.util.Arrays; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; - -/** - * Represents a dataflow, which includes the raw bytes of the flow.xml and - * whether processors should be started automatically at application startup. - */ -@XmlJavaTypeAdapter(DataFlowAdapter.class) -public class StandardDataFlow implements Serializable, DataFlow { - - private final byte[] flow; - private final byte[] templateBytes; - private final byte[] snippetBytes; - - private boolean autoStartProcessors; - - /** - * Constructs an instance. - * - * @param flow a valid flow as bytes, which cannot be null - * @param templateBytes an XML representation of templates - * @param snippetBytes an XML representation of snippets - * - * @throws NullPointerException if any argument is null - */ - public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { - this.flow = flow; - this.templateBytes = templateBytes; - this.snippetBytes = snippetBytes; - } - - public StandardDataFlow(final DataFlow toCopy) { - this.flow = copy(toCopy.getFlow()); - this.templateBytes = copy(toCopy.getTemplates()); - this.snippetBytes = copy(toCopy.getSnippets()); - this.autoStartProcessors = toCopy.isAutoStartProcessors(); - } - - private static byte[] copy(final byte[] bytes) { - return bytes == null ? null : Arrays.copyOf(bytes, bytes.length); - } - - /** - * @return the raw byte array of the flow - */ - public byte[] getFlow() { - return flow; - } - - /** - * @return the raw byte array of the templates - */ - public byte[] getTemplates() { - return templateBytes; - } - - /** - * @return the raw byte array of the snippets - */ - public byte[] getSnippets() { - return snippetBytes; - } - - /** - * @return true if processors should be automatically started at application - * startup; false otherwise - */ - public boolean isAutoStartProcessors() { - return autoStartProcessors; - } - - /** - * - * Sets the flag to automatically start processors at application startup. - * - * @param autoStartProcessors true if processors should be automatically - * started at application startup; false otherwise - */ - public void setAutoStartProcessors(final boolean autoStartProcessors) { - this.autoStartProcessors = autoStartProcessors; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java deleted file mode 100644 index 41c74eb..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol; - -/** - * Represents the exceptional case when a service's address is not known. - * @author unattributed - */ -public class UnknownServiceAddressException extends RuntimeException { - - public UnknownServiceAddressException() { - } - - public UnknownServiceAddressException(String msg) { - super(msg); - } - - public UnknownServiceAddressException(Throwable cause) { - super(cause); - } - - public UnknownServiceAddressException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java deleted file mode 100644 index ceb3fcb..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.SocketUtils; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.util.FormatUtils; - -/** - * A protocol sender for sending protocol messages from the cluster manager to - * nodes. - * - * Connection-type requests (e.g., reconnection, disconnection) by nature of - * starting/stopping flow controllers take longer than other types of protocol - * messages. Therefore, a handshake timeout may be specified to lengthen the - * allowable time for communication with the node. - * - * @author unattributed - */ -public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender { - - - private final ProtocolContext<ProtocolMessage> protocolContext; - private final SocketConfiguration socketConfiguration; - private int handshakeTimeoutSeconds; - private volatile BulletinRepository bulletinRepository; - - public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - if(socketConfiguration == null) { - throw new IllegalArgumentException("Socket configuration may not be null."); - } else if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - this.socketConfiguration = socketConfiguration; - this.protocolContext = protocolContext; - this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - /** - * Requests the data flow from a node. - * @param msg a message - * @return the message response - * @throws @throws ProtocolException if the message failed to be sent or the response was malformed - */ - @Override - public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), false); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.FLOW_RESPONSE == response.getType()) { - return (FlowResponseMessage) response; - } else { - throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); - } - - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Requests a node to reconnect to the cluster. The configured value for - * handshake timeout is applied to the socket before making the request. - * @param msg a message - * @return the response - * @throws ProtocolException if the message failed to be sent or the response was malformed - */ - @Override - public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - // marshal message to output stream - try { - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.RECONNECTION_RESPONSE == response.getType()) { - return (ReconnectionResponseMessage) response; - } else { - throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Requests a node to disconnect from the cluster. The configured value for - * handshake timeout is applied to the socket before making the request. - * @param msg a message - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void disconnect(final DisconnectMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - // marshal message to output stream - try { - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - /** - * Assigns the primary role to a node. - * - * @param msg a message - * - * @throws ProtocolException if the message failed to be sent - */ - @Override - public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException { - Socket socket = null; - try { - socket = createSocket(msg.getNodeId(), true); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - - private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { - // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout - if(handshakeTimeoutSeconds >= 0) { - socket.setSoTimeout(handshakeTimeoutSeconds * 1000); - } - } - - public SocketConfiguration getSocketConfiguration() { - return socketConfiguration; - } - - public int getHandshakeTimeoutSeconds() { - return handshakeTimeoutSeconds; - } - - public void setHandshakeTimeout(final String handshakeTimeout) { - this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(handshakeTimeout, TimeUnit.SECONDS); - } - - private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) { - return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout); - } - - private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) { - try { - // create a socket - final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration); - if ( applyHandshakeTimeout ) { - setConnectionHandshakeTimeoutOnSocket(socket); - } - return socket; - } catch(final IOException ioe) { - throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java deleted file mode 100644 index 933e5fa..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.reporting.BulletinRepository; - -/** - * A wrapper class for consolidating a protocol sender and listener for the cluster - * manager. - * - * @author unattributed - */ -public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener { - - private final ClusterManagerProtocolSender sender; - - private final ProtocolListener listener; - - public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) { - if(sender == null) { - throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null."); - } else if(listener == null) { - throw new IllegalArgumentException("ProtocolListener may not be null."); - } - this.sender = sender; - this.listener = listener; - } - - @Override - public void stop() throws IOException { - if(!isRunning()) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - @Override - public void start() throws IOException { - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - listener.start(); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return listener.getHandlers(); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - listener.setBulletinRepository(bulletinRepository); - sender.setBulletinRepository(bulletinRepository); - } - - @Override - public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { - return sender.requestFlow(msg); - } - - @Override - public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { - return sender.requestReconnection(msg); - } - - @Override - public void disconnect(DisconnectMessage msg) throws ProtocolException { - sender.disconnect(msg); - } - - @Override - public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException { - sender.assignPrimaryRole(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java deleted file mode 100644 index 24e51e0..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery; -import org.apache.nifi.reporting.BulletinRepository; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation for discovering services by way of "service broadcast" type - * protocol messages over multicast. - * - * The client caller is responsible for starting and stopping the service - * discovery. The instance must be stopped before termination of the JVM to - * ensure proper resource clean-up. - * - * @author unattributed - */ -public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener { - - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceDiscovery.class); - private final String serviceName; - private final MulticastConfiguration multicastConfiguration; - private final MulticastProtocolListener listener; - private volatile BulletinRepository bulletinRepository; - - /* - * guarded by this - */ - private DiscoverableService service; - - - public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress, - final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - - if (StringUtils.isBlank(serviceName)) { - throw new IllegalArgumentException("Service name may not be null or empty."); - } else if (multicastAddress == null) { - throw new IllegalArgumentException("Multicast address may not be null."); - } else if (multicastAddress.getAddress().isMulticastAddress() == false) { - throw new IllegalArgumentException("Multicast group must be a Class D address."); - } else if (protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } else if (multicastConfiguration == null) { - throw new IllegalArgumentException("Multicast configuration may not be null."); - } - - this.serviceName = serviceName; - this.multicastConfiguration = multicastConfiguration; - this.listener = new MulticastProtocolListener(1, multicastAddress, multicastConfiguration, protocolContext); - listener.addHandler(new ClusterManagerServiceBroadcastHandler()); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - @Override - public synchronized DiscoverableService getService() { - return service; - } - - @Override - public InetSocketAddress getMulticastAddress() { - return listener.getMulticastAddress(); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return Collections.unmodifiableCollection(listener.getHandlers()); - } - - @Override - public void addHandler(ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public boolean removeHandler(ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public void start() throws IOException { - if (isRunning()) { - throw new IllegalStateException("Instance is already running."); - } - listener.start(); - } - - @Override - public void stop() throws IOException { - if (isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - public String getServiceName() { - return serviceName; - } - - public MulticastConfiguration getMulticastConfiguration() { - return multicastConfiguration; - } - - private class ClusterManagerServiceBroadcastHandler implements ProtocolHandler { - - @Override - public boolean canHandle(final ProtocolMessage msg) { - return MessageType.SERVICE_BROADCAST == msg.getType(); - } - - @Override - public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { - synchronized (ClusterServiceDiscovery.this) { - if (canHandle(msg) == false) { - throw new ProtocolException("Handler cannot handle message type: " + msg.getType()); - } else { - final ServiceBroadcastMessage broadcastMsg = (ServiceBroadcastMessage) msg; - if (serviceName.equals(broadcastMsg.getServiceName())) { - final DiscoverableService oldService = service; - if (oldService == null - || broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName()) == false - || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) { - service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort())); - final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress(); - logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress()))); - } - } - return null; - } - } - } - } - - private String prettyPrint(final InetSocketAddress address) { - if (address == null) { - return "0.0.0.0:0"; - } else { - return address.getHostName() + ":" + address.getPort(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java deleted file mode 100644 index bebfde8..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.ServiceDiscovery; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements the ServiceLocator interface for locating the socket address - * of a cluster service. Depending on configuration, the address may be located - * using service discovery. If using service discovery, then the service methods - * must be used for starting and stopping discovery. - * - * Service discovery may be used in conjunction with a fixed port. In this case, - * the service discovery will yield the service IP/host while the fixed port will - * be used for the port. - * - * Alternatively, the instance may be configured with exact service location, in - * which case, no service discovery occurs and the caller will always receive the - * configured service. - * - * @author unattributed - */ -public class ClusterServiceLocator implements ServiceDiscovery { - - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class); - - private final String serviceName; - - private final ClusterServiceDiscovery serviceDiscovery; - - private final DiscoverableService fixedService; - - private final int fixedServicePort; - - private final AttemptsConfig attemptsConfig = new AttemptsConfig(); - - private final AtomicBoolean running = new AtomicBoolean(false); - - public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) { - if(serviceDiscovery == null) { - throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); - } - this.serviceDiscovery = serviceDiscovery; - this.fixedService = null; - this.fixedServicePort = 0; - this.serviceName = serviceDiscovery.getServiceName(); - } - - public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) { - if(serviceDiscovery == null) { - throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); - } - this.serviceDiscovery = serviceDiscovery; - this.fixedService = null; - this.fixedServicePort = fixedServicePort; - this.serviceName = serviceDiscovery.getServiceName(); - } - - public ClusterServiceLocator(final DiscoverableService fixedService) { - if(fixedService == null) { - throw new IllegalArgumentException("Service may not be null."); - } - this.serviceDiscovery = null; - this.fixedService = fixedService; - this.fixedServicePort = 0; - this.serviceName = fixedService.getServiceName(); - } - - @Override - public DiscoverableService getService() { - - final int numAttemptsValue; - final int secondsBetweenAttempts; - synchronized(this) { - numAttemptsValue = attemptsConfig.numAttempts; - secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts(); - } - - // try for a configured amount of attempts to retrieve the service address - for(int i = 0; i < numAttemptsValue; i++) { - - if(fixedService != null) { - return fixedService; - } else if(serviceDiscovery != null) { - - final DiscoverableService discoveredService = serviceDiscovery.getService(); - - // if we received an address - if(discoveredService != null) { - // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address - if(fixedServicePort > 0) { - // create service using discovered service name and address with fixed service port - final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort); - final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr); - return result; - } else { - return discoveredService; - } - } - } - - // could not obtain service address, so sleep a bit - try { - logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.", - serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts)); - Thread.sleep(secondsBetweenAttempts * 1000); - } catch(final InterruptedException ie) { - break; - } - - } - - return null; - } - - public boolean isRunning() { - if(serviceDiscovery != null) { - return serviceDiscovery.isRunning(); - } else { - return running.get(); - } - } - - public void start() throws IOException { - - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - if(serviceDiscovery != null) { - serviceDiscovery.start(); - } - running.set(true); - } - - public void stop() throws IOException { - - if(isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - if(serviceDiscovery != null) { - serviceDiscovery.stop(); - } - running.set(false); - } - - public synchronized void setAttemptsConfig(final AttemptsConfig config) { - if(config == null) { - throw new IllegalArgumentException("Attempts configuration may not be null."); - } - this.attemptsConfig.numAttempts = config.numAttempts; - this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts; - this.attemptsConfig.timeBetweenAttempsUnit = config.timeBetweenAttempsUnit; - } - - public synchronized AttemptsConfig getAttemptsConfig() { - final AttemptsConfig config = new AttemptsConfig(); - config.numAttempts = this.attemptsConfig.numAttempts; - config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts; - config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit; - return config; - } - - public static class AttemptsConfig { - - private int numAttempts = 1; - - private int timeBetweenAttempts = 1; - - private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS; - - public int getNumAttempts() { - return numAttempts; - } - - public void setNumAttempts(int numAttempts) { - if(numAttempts <= 0) { - throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts); - } - this.numAttempts = numAttempts; - } - - public TimeUnit getTimeBetweenAttemptsUnit() { - return timeBetweenAttempsUnit; - } - - public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } - this.timeBetweenAttempsUnit = timeBetweenAttempsUnit; - } - - public int getTimeBetweenAttempts() { - return timeBetweenAttempts; - } - - public void setTimeBetweenAttempts(int timeBetweenAttempts) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } - this.timeBetweenAttempts = timeBetweenAttempts; - } - - } -}