http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java new file mode 100644 index 0000000..1edcb91 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; + +/** + * An interface for sending protocol messages from a node to the cluster manager. + * @author unattributed + */ +public interface NodeProtocolSender { + + /** + * Sends a "connection request" message to the cluster manager. + * @param msg a message + * @return the response + * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws ProtocolException if communication failed + */ + ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException; + + /** + * Sends a "heartbeat" message to the cluster manager. + * @param msg a message + * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws ProtocolException if communication failed + */ + void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException; + + /** + * Sends a bulletins message to the cluster manager. + * @param msg + * @throws ProtocolException + * @throws UnknownServiceAddressException + */ + void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException; + + /** + * Sends a failure notification if the controller was unable start. + * @param msg a message + * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws ProtocolException if communication failed + */ + void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; + + /** + * Sends a failure notification if the node was unable to reconnect to the cluster + * @param msg a message + * @throws UnknownServiceAddressException if the cluster manager's address is not known + * @throws ProtocolException if communication failed + */ + void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException; + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java new file mode 100644 index 0000000..b614e76 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +/** + * The context for communicating using the internal cluster protocol. + * + * @param <T> The type of protocol message. + * + * @author unattributed + */ +public interface ProtocolContext<T> { + + /** + * Creates a marshaller for serializing protocol messages. + * @return a marshaller + */ + ProtocolMessageMarshaller<T> createMarshaller(); + + /** + * Creates an unmarshaller for deserializing protocol messages. + * @return a unmarshaller + */ + ProtocolMessageUnmarshaller<T> createUnmarshaller(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java new file mode 100644 index 0000000..f11ad84 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +/** + * The base exception for problems encountered while communicating within the + * cluster. + * @author unattributed + */ +public class ProtocolException extends RuntimeException { + + public ProtocolException() { + } + + public ProtocolException(String msg) { + super(msg); + } + + public ProtocolException(Throwable cause) { + super(cause); + } + + public ProtocolException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java new file mode 100644 index 0000000..6de87db --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; + +/** + * A handler for processing protocol messages. + * @author unattributed + */ +public interface ProtocolHandler { + + /** + * Handles the given protocol message or throws an exception if it cannot + * handle the message. If no response is needed by the protocol, then null + * should be returned. + * + * @param msg a message + * @return a response or null, if no response is necessary + * + * @throws ProtocolException if the message could not be processed + */ + ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException; + + /** + * @param msg + * @return true if the handler can process the given message; false otherwise + */ + boolean canHandle(ProtocolMessage msg); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java new file mode 100644 index 0000000..32f0f5d --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.nifi.reporting.BulletinRepository; + +/** + * Defines the interface for a listener to process protocol messages. + * @author unattributed + */ +public interface ProtocolListener { + + /** + * Starts the instance for listening for messages. Start may only be called + * if the instance is not running. + * @throws java.io.IOException + */ + void start() throws IOException; + + /** + * Stops the instance from listening for messages. Stop may only be called + * if the instance is running. + * @throws java.io.IOException + */ + void stop() throws IOException; + + /** + * @return true if the instance is started; false otherwise. + */ + boolean isRunning(); + + /** + * @return the handlers registered with the listener + */ + Collection<ProtocolHandler> getHandlers(); + + /** + * Registers a handler with the listener. + * @param handler a handler + */ + void addHandler(ProtocolHandler handler); + + /** + * Sets the BulletinRepository that can be used to report bulletins + * @param bulletinRepository + */ + void setBulletinRepository(BulletinRepository bulletinRepository); + + /** + * Unregisters the handler with the listener. + * @param handler a handler + * @return true if the handler was removed; false otherwise + */ + boolean removeHandler(ProtocolHandler handler); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java new file mode 100644 index 0000000..bb436e0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Defines a marshaller for serializing protocol messages. + * + * @param <T> The type of protocol message. + * + * @author unattributed + */ +public interface ProtocolMessageMarshaller<T> { + + /** + * Serializes the given message to the given output stream. + * @param msg a message + * @param os an output stream + * @throws IOException if the message could not be serialized to the stream + */ + void marshal(T msg, OutputStream os) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java new file mode 100644 index 0000000..c690e7b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Defines an unmarshaller for deserializing protocol messages. + * + * @param <T> The type of protocol message. + * + * @author unattributed + */ +public interface ProtocolMessageUnmarshaller<T> { + + /** + * Deserializes a message on the given input stream. + * @param is an input stream + * @return + * @throws IOException if the message could not be deserialized from the stream + */ + T unmarshal(InputStream is) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java new file mode 100644 index 0000000..c2d16fc --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +import org.apache.nifi.cluster.protocol.DataFlow; +import java.io.Serializable; +import java.util.Arrays; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter; + +/** + * Represents a dataflow, which includes the raw bytes of the flow.xml and + * whether processors should be started automatically at application startup. + */ +@XmlJavaTypeAdapter(DataFlowAdapter.class) +public class StandardDataFlow implements Serializable, DataFlow { + + private final byte[] flow; + private final byte[] templateBytes; + private final byte[] snippetBytes; + + private boolean autoStartProcessors; + + /** + * Constructs an instance. + * + * @param flow a valid flow as bytes, which cannot be null + * @param templateBytes an XML representation of templates + * @param snippetBytes an XML representation of snippets + * + * @throws NullPointerException if any argument is null + */ + public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) { + this.flow = flow; + this.templateBytes = templateBytes; + this.snippetBytes = snippetBytes; + } + + public StandardDataFlow(final DataFlow toCopy) { + this.flow = copy(toCopy.getFlow()); + this.templateBytes = copy(toCopy.getTemplates()); + this.snippetBytes = copy(toCopy.getSnippets()); + this.autoStartProcessors = toCopy.isAutoStartProcessors(); + } + + private static byte[] copy(final byte[] bytes) { + return bytes == null ? null : Arrays.copyOf(bytes, bytes.length); + } + + /** + * @return the raw byte array of the flow + */ + public byte[] getFlow() { + return flow; + } + + /** + * @return the raw byte array of the templates + */ + public byte[] getTemplates() { + return templateBytes; + } + + /** + * @return the raw byte array of the snippets + */ + public byte[] getSnippets() { + return snippetBytes; + } + + /** + * @return true if processors should be automatically started at application + * startup; false otherwise + */ + public boolean isAutoStartProcessors() { + return autoStartProcessors; + } + + /** + * + * Sets the flag to automatically start processors at application startup. + * + * @param autoStartProcessors true if processors should be automatically + * started at application startup; false otherwise + */ + public void setAutoStartProcessors(final boolean autoStartProcessors) { + this.autoStartProcessors = autoStartProcessors; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java new file mode 100644 index 0000000..41c74eb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol; + +/** + * Represents the exceptional case when a service's address is not known. + * @author unattributed + */ +public class UnknownServiceAddressException extends RuntimeException { + + public UnknownServiceAddressException() { + } + + public UnknownServiceAddressException(String msg) { + super(msg); + } + + public UnknownServiceAddressException(Throwable cause) { + super(cause); + } + + public UnknownServiceAddressException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java new file mode 100644 index 0000000..ceb3fcb --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; +import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.SocketUtils; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.util.FormatUtils; + +/** + * A protocol sender for sending protocol messages from the cluster manager to + * nodes. + * + * Connection-type requests (e.g., reconnection, disconnection) by nature of + * starting/stopping flow controllers take longer than other types of protocol + * messages. Therefore, a handshake timeout may be specified to lengthen the + * allowable time for communication with the node. + * + * @author unattributed + */ +public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender { + + + private final ProtocolContext<ProtocolMessage> protocolContext; + private final SocketConfiguration socketConfiguration; + private int handshakeTimeoutSeconds; + private volatile BulletinRepository bulletinRepository; + + public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { + if(socketConfiguration == null) { + throw new IllegalArgumentException("Socket configuration may not be null."); + } else if(protocolContext == null) { + throw new IllegalArgumentException("Protocol Context may not be null."); + } + this.socketConfiguration = socketConfiguration; + this.protocolContext = protocolContext; + this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured + } + + @Override + public void setBulletinRepository(final BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } + + /** + * Requests the data flow from a node. + * @param msg a message + * @return the message response + * @throws @throws ProtocolException if the message failed to be sent or the response was malformed + */ + @Override + public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { + Socket socket = null; + try { + socket = createSocket(msg.getNodeId(), false); + + try { + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + + final ProtocolMessage response; + try { + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); + response = unmarshaller.unmarshal(socket.getInputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe); + } + + if(MessageType.FLOW_RESPONSE == response.getType()) { + return (FlowResponseMessage) response; + } else { + throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); + } + + } finally { + SocketUtils.closeQuietly(socket); + } + } + + /** + * Requests a node to reconnect to the cluster. The configured value for + * handshake timeout is applied to the socket before making the request. + * @param msg a message + * @return the response + * @throws ProtocolException if the message failed to be sent or the response was malformed + */ + @Override + public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { + Socket socket = null; + try { + socket = createSocket(msg.getNodeId(), true); + + // marshal message to output stream + try { + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + + + final ProtocolMessage response; + try { + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); + response = unmarshaller.unmarshal(socket.getInputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); + } + + if(MessageType.RECONNECTION_RESPONSE == response.getType()) { + return (ReconnectionResponseMessage) response; + } else { + throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'"); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + /** + * Requests a node to disconnect from the cluster. The configured value for + * handshake timeout is applied to the socket before making the request. + * @param msg a message + * @throws ProtocolException if the message failed to be sent + */ + @Override + public void disconnect(final DisconnectMessage msg) throws ProtocolException { + Socket socket = null; + try { + socket = createSocket(msg.getNodeId(), true); + + // marshal message to output stream + try { + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + /** + * Assigns the primary role to a node. + * + * @param msg a message + * + * @throws ProtocolException if the message failed to be sent + */ + @Override + public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException { + Socket socket = null; + try { + socket = createSocket(msg.getNodeId(), true); + + try { + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + + private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException { + // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout + if(handshakeTimeoutSeconds >= 0) { + socket.setSoTimeout(handshakeTimeoutSeconds * 1000); + } + } + + public SocketConfiguration getSocketConfiguration() { + return socketConfiguration; + } + + public int getHandshakeTimeoutSeconds() { + return handshakeTimeoutSeconds; + } + + public void setHandshakeTimeout(final String handshakeTimeout) { + this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(handshakeTimeout, TimeUnit.SECONDS); + } + + private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) { + return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout); + } + + private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) { + try { + // create a socket + final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration); + if ( applyHandshakeTimeout ) { + setConnectionHandshakeTimeoutOnSocket(socket); + } + return socket; + } catch(final IOException ioe) { + throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java new file mode 100644 index 0000000..933e5fa --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; +import org.apache.nifi.reporting.BulletinRepository; + +/** + * A wrapper class for consolidating a protocol sender and listener for the cluster + * manager. + * + * @author unattributed + */ +public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener { + + private final ClusterManagerProtocolSender sender; + + private final ProtocolListener listener; + + public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) { + if(sender == null) { + throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null."); + } else if(listener == null) { + throw new IllegalArgumentException("ProtocolListener may not be null."); + } + this.sender = sender; + this.listener = listener; + } + + @Override + public void stop() throws IOException { + if(!isRunning()) { + throw new IllegalStateException("Instance is already stopped."); + } + listener.stop(); + } + + @Override + public void start() throws IOException { + if(isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + listener.start(); + } + + @Override + public boolean isRunning() { + return listener.isRunning(); + } + + @Override + public boolean removeHandler(final ProtocolHandler handler) { + return listener.removeHandler(handler); + } + + @Override + public Collection<ProtocolHandler> getHandlers() { + return listener.getHandlers(); + } + + @Override + public void addHandler(final ProtocolHandler handler) { + listener.addHandler(handler); + } + + @Override + public void setBulletinRepository(final BulletinRepository bulletinRepository) { + listener.setBulletinRepository(bulletinRepository); + sender.setBulletinRepository(bulletinRepository); + } + + @Override + public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { + return sender.requestFlow(msg); + } + + @Override + public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { + return sender.requestReconnection(msg); + } + + @Override + public void disconnect(DisconnectMessage msg) throws ProtocolException { + sender.disconnect(msg); + } + + @Override + public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException { + sender.assignPrimaryRole(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java new file mode 100644 index 0000000..24e51e0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Collections; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery; +import org.apache.nifi.reporting.BulletinRepository; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation for discovering services by way of "service broadcast" type + * protocol messages over multicast. + * + * The client caller is responsible for starting and stopping the service + * discovery. The instance must be stopped before termination of the JVM to + * ensure proper resource clean-up. + * + * @author unattributed + */ +public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener { + + private static final Logger logger = LoggerFactory.getLogger(ClusterServiceDiscovery.class); + private final String serviceName; + private final MulticastConfiguration multicastConfiguration; + private final MulticastProtocolListener listener; + private volatile BulletinRepository bulletinRepository; + + /* + * guarded by this + */ + private DiscoverableService service; + + + public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress, + final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { + + if (StringUtils.isBlank(serviceName)) { + throw new IllegalArgumentException("Service name may not be null or empty."); + } else if (multicastAddress == null) { + throw new IllegalArgumentException("Multicast address may not be null."); + } else if (multicastAddress.getAddress().isMulticastAddress() == false) { + throw new IllegalArgumentException("Multicast group must be a Class D address."); + } else if (protocolContext == null) { + throw new IllegalArgumentException("Protocol Context may not be null."); + } else if (multicastConfiguration == null) { + throw new IllegalArgumentException("Multicast configuration may not be null."); + } + + this.serviceName = serviceName; + this.multicastConfiguration = multicastConfiguration; + this.listener = new MulticastProtocolListener(1, multicastAddress, multicastConfiguration, protocolContext); + listener.addHandler(new ClusterManagerServiceBroadcastHandler()); + } + + @Override + public void setBulletinRepository(final BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } + + @Override + public synchronized DiscoverableService getService() { + return service; + } + + @Override + public InetSocketAddress getMulticastAddress() { + return listener.getMulticastAddress(); + } + + @Override + public Collection<ProtocolHandler> getHandlers() { + return Collections.unmodifiableCollection(listener.getHandlers()); + } + + @Override + public void addHandler(ProtocolHandler handler) { + listener.addHandler(handler); + } + + @Override + public boolean removeHandler(ProtocolHandler handler) { + return listener.removeHandler(handler); + } + + @Override + public boolean isRunning() { + return listener.isRunning(); + } + + @Override + public void start() throws IOException { + if (isRunning()) { + throw new IllegalStateException("Instance is already running."); + } + listener.start(); + } + + @Override + public void stop() throws IOException { + if (isRunning() == false) { + throw new IllegalStateException("Instance is already stopped."); + } + listener.stop(); + } + + public String getServiceName() { + return serviceName; + } + + public MulticastConfiguration getMulticastConfiguration() { + return multicastConfiguration; + } + + private class ClusterManagerServiceBroadcastHandler implements ProtocolHandler { + + @Override + public boolean canHandle(final ProtocolMessage msg) { + return MessageType.SERVICE_BROADCAST == msg.getType(); + } + + @Override + public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { + synchronized (ClusterServiceDiscovery.this) { + if (canHandle(msg) == false) { + throw new ProtocolException("Handler cannot handle message type: " + msg.getType()); + } else { + final ServiceBroadcastMessage broadcastMsg = (ServiceBroadcastMessage) msg; + if (serviceName.equals(broadcastMsg.getServiceName())) { + final DiscoverableService oldService = service; + if (oldService == null + || broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName()) == false + || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) { + service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort())); + final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress(); + logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress()))); + } + } + return null; + } + } + } + } + + private String prettyPrint(final InetSocketAddress address) { + if (address == null) { + return "0.0.0.0:0"; + } else { + return address.getHostName() + ":" + address.getPort(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java new file mode 100644 index 0000000..bebfde8 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; +import org.apache.nifi.io.socket.multicast.ServiceDiscovery; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements the ServiceLocator interface for locating the socket address + * of a cluster service. Depending on configuration, the address may be located + * using service discovery. If using service discovery, then the service methods + * must be used for starting and stopping discovery. + * + * Service discovery may be used in conjunction with a fixed port. In this case, + * the service discovery will yield the service IP/host while the fixed port will + * be used for the port. + * + * Alternatively, the instance may be configured with exact service location, in + * which case, no service discovery occurs and the caller will always receive the + * configured service. + * + * @author unattributed + */ +public class ClusterServiceLocator implements ServiceDiscovery { + + private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class); + + private final String serviceName; + + private final ClusterServiceDiscovery serviceDiscovery; + + private final DiscoverableService fixedService; + + private final int fixedServicePort; + + private final AttemptsConfig attemptsConfig = new AttemptsConfig(); + + private final AtomicBoolean running = new AtomicBoolean(false); + + public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) { + if(serviceDiscovery == null) { + throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); + } + this.serviceDiscovery = serviceDiscovery; + this.fixedService = null; + this.fixedServicePort = 0; + this.serviceName = serviceDiscovery.getServiceName(); + } + + public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) { + if(serviceDiscovery == null) { + throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); + } + this.serviceDiscovery = serviceDiscovery; + this.fixedService = null; + this.fixedServicePort = fixedServicePort; + this.serviceName = serviceDiscovery.getServiceName(); + } + + public ClusterServiceLocator(final DiscoverableService fixedService) { + if(fixedService == null) { + throw new IllegalArgumentException("Service may not be null."); + } + this.serviceDiscovery = null; + this.fixedService = fixedService; + this.fixedServicePort = 0; + this.serviceName = fixedService.getServiceName(); + } + + @Override + public DiscoverableService getService() { + + final int numAttemptsValue; + final int secondsBetweenAttempts; + synchronized(this) { + numAttemptsValue = attemptsConfig.numAttempts; + secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts(); + } + + // try for a configured amount of attempts to retrieve the service address + for(int i = 0; i < numAttemptsValue; i++) { + + if(fixedService != null) { + return fixedService; + } else if(serviceDiscovery != null) { + + final DiscoverableService discoveredService = serviceDiscovery.getService(); + + // if we received an address + if(discoveredService != null) { + // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address + if(fixedServicePort > 0) { + // create service using discovered service name and address with fixed service port + final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort); + final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr); + return result; + } else { + return discoveredService; + } + } + } + + // could not obtain service address, so sleep a bit + try { + logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.", + serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts)); + Thread.sleep(secondsBetweenAttempts * 1000); + } catch(final InterruptedException ie) { + break; + } + + } + + return null; + } + + public boolean isRunning() { + if(serviceDiscovery != null) { + return serviceDiscovery.isRunning(); + } else { + return running.get(); + } + } + + public void start() throws IOException { + + if(isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + + if(serviceDiscovery != null) { + serviceDiscovery.start(); + } + running.set(true); + } + + public void stop() throws IOException { + + if(isRunning() == false) { + throw new IllegalStateException("Instance is already stopped."); + } + + if(serviceDiscovery != null) { + serviceDiscovery.stop(); + } + running.set(false); + } + + public synchronized void setAttemptsConfig(final AttemptsConfig config) { + if(config == null) { + throw new IllegalArgumentException("Attempts configuration may not be null."); + } + this.attemptsConfig.numAttempts = config.numAttempts; + this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts; + this.attemptsConfig.timeBetweenAttempsUnit = config.timeBetweenAttempsUnit; + } + + public synchronized AttemptsConfig getAttemptsConfig() { + final AttemptsConfig config = new AttemptsConfig(); + config.numAttempts = this.attemptsConfig.numAttempts; + config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts; + config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit; + return config; + } + + public static class AttemptsConfig { + + private int numAttempts = 1; + + private int timeBetweenAttempts = 1; + + private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS; + + public int getNumAttempts() { + return numAttempts; + } + + public void setNumAttempts(int numAttempts) { + if(numAttempts <= 0) { + throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts); + } + this.numAttempts = numAttempts; + } + + public TimeUnit getTimeBetweenAttemptsUnit() { + return timeBetweenAttempsUnit; + } + + public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) { + if(timeBetweenAttempts <= 0) { + throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); + } + this.timeBetweenAttempsUnit = timeBetweenAttempsUnit; + } + + public int getTimeBetweenAttempts() { + return timeBetweenAttempts; + } + + public void setTimeBetweenAttempts(int timeBetweenAttempts) { + if(timeBetweenAttempts <= 0) { + throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); + } + this.timeBetweenAttempts = timeBetweenAttempts; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java new file mode 100644 index 0000000..e9e7d5b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.util.*; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster; +import org.apache.nifi.io.socket.multicast.MulticastUtils; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broadcasts services used by the clustering software using multicast communication. + * A configurable delay occurs after broadcasting the collection of services. + * + * The client caller is responsible for starting and stopping the broadcasting. + * The instance must be stopped before termination of the JVM to ensure proper + * resource clean-up. + * + * @author unattributed + */ +public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class)); + + private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>(); + + private final InetSocketAddress multicastAddress; + + private final MulticastConfiguration multicastConfiguration; + + private final ProtocolContext<ProtocolMessage> protocolContext; + + private final int broadcastDelayMs; + + private Timer broadcaster; + + private MulticastSocket multicastSocket; + + public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, + final MulticastConfiguration multicastConfiguration, + final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) { + + if(multicastAddress == null) { + throw new IllegalArgumentException("Multicast address may not be null."); + } else if(multicastAddress.getAddress().isMulticastAddress() == false) { + throw new IllegalArgumentException("Multicast group address is not a Class D IP address."); + } else if(protocolContext == null) { + throw new IllegalArgumentException("Protocol Context may not be null."); + } else if(multicastConfiguration == null) { + throw new IllegalArgumentException("Multicast configuration may not be null."); + } + + this.services.addAll(services); + this.multicastAddress = multicastAddress; + this.multicastConfiguration = multicastConfiguration; + this.protocolContext = protocolContext; + this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS); + } + + public void start() throws IOException { + + if(isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + + // setup socket + multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration); + + // setup broadcaster + broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true); + broadcaster.schedule(new TimerTask() { + @Override + public void run() { + for(final DiscoverableService service : services) { + try { + + final InetSocketAddress serviceAddress = service.getServiceAddress(); + logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", + service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort())); + + // create message + final ServiceBroadcastMessage msg = new ServiceBroadcastMessage(); + msg.setServiceName(service.getServiceName()); + msg.setAddress(serviceAddress.getHostName()); + msg.setPort(serviceAddress.getPort()); + + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + marshaller.marshal(msg, baos); + final byte[] packetBytes = baos.toByteArray(); + + // send message + final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress); + multicastSocket.send(packet); + + } catch(final Exception ex) { + logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex); + } + } + } + }, 0, broadcastDelayMs); + } + + public boolean isRunning() { + return (broadcaster != null); + } + + public void stop() { + + if(isRunning() == false) { + throw new IllegalStateException("Instance is already stopped."); + } + + broadcaster.cancel(); + broadcaster = null; + + // close socket + MulticastUtils.closeQuietly(multicastSocket); + + } + + @Override + public int getBroadcastDelayMs() { + return broadcastDelayMs; + } + + @Override + public Set<DiscoverableService> getServices() { + return Collections.unmodifiableSet(services); + } + + @Override + public InetSocketAddress getMulticastAddress() { + return multicastAddress; + } + + @Override + public boolean addService(final DiscoverableService service) { + return services.add(service); + } + + @Override + public boolean removeService(final String serviceName) { + for(final DiscoverableService service : services) { + if(service.getServiceName().equals(serviceName)) { + return services.remove(service); + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java new file mode 100644 index 0000000..680df65 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class CopyingInputStream extends FilterInputStream { + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private final int maxBytesToCopy; + private final InputStream in; + + public CopyingInputStream(final InputStream in, final int maxBytesToCopy) { + super(in); + this.maxBytesToCopy = maxBytesToCopy; + this.in = in; + } + + @Override + public int read() throws IOException { + final int delegateRead = in.read(); + if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) { + baos.write(delegateRead); + } + + return delegateRead; + } + + @Override + public int read(byte[] b) throws IOException { + final int delegateRead = in.read(b); + if ( delegateRead >= 0 ) { + baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); + } + + return delegateRead; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + final int delegateRead = in.read(b, off, len); + if ( delegateRead >= 0 ) { + baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); + } + + return delegateRead; + } + + public byte[] getBytesRead() { + return baos.toByteArray(); + } + + public void writeBytes(final OutputStream out) throws IOException { + baos.writeTo(out); + } + + public int getNumberOfBytesCopied() { + return baos.size(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java new file mode 100644 index 0000000..d3764b3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.nifi.io.socket.multicast.MulticastConfiguration; +import org.apache.nifi.io.socket.multicast.MulticastListener; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.events.BulletinFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a listener for protocol messages sent over multicast. If a message + * is of type MulticastProtocolMessage, then the underlying protocol message is + * passed to the handler. If the receiving handler produces a message response, + * then the message is wrapped with a MulticastProtocolMessage before being + * sent to the originator. + * + * The client caller is responsible for starting and stopping the listener. + * The instance must be stopped before termination of the JVM to ensure proper + * resource clean-up. + * + * @author unattributed + */ +public class MulticastProtocolListener extends MulticastListener implements ProtocolListener { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class)); + + // immutable members + private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>(); + private final String listenerId = UUID.randomUUID().toString(); + private final ProtocolContext<ProtocolMessage> protocolContext; + private volatile BulletinRepository bulletinRepository; + + public MulticastProtocolListener( + final int numThreads, + final InetSocketAddress multicastAddress, + final MulticastConfiguration configuration, + final ProtocolContext<ProtocolMessage> protocolContext) { + + super(numThreads, multicastAddress, configuration); + + if (protocolContext == null) { + throw new IllegalArgumentException("Protocol Context may not be null."); + } + this.protocolContext = protocolContext; + } + + @Override + public void setBulletinRepository(final BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } + + @Override + public void start() throws IOException { + + if(super.isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + + super.start(); + + } + + @Override + public void stop() throws IOException { + + if(super.isRunning() == false) { + throw new IllegalStateException("Instance is already stopped."); + } + + // shutdown listener + super.stop(); + + } + + @Override + public Collection<ProtocolHandler> getHandlers() { + return Collections.unmodifiableCollection(handlers); + } + + @Override + public void addHandler(final ProtocolHandler handler) { + if(handler == null) { + throw new NullPointerException("Protocol handler may not be null."); + } + handlers.add(handler); + } + + @Override + public boolean removeHandler(final ProtocolHandler handler) { + return handlers.remove(handler); + } + + @Override + public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) { + + try { + + // unmarshall message + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); + final ProtocolMessage request = unmarshaller.unmarshal(new ByteArrayInputStream(packet.getData(), 0, packet.getLength())); + + // unwrap multicast message, if necessary + final ProtocolMessage unwrappedRequest; + if(request instanceof MulticastProtocolMessage) { + final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request; + // don't process a message we sent + if(listenerId.equals(multicastRequest.getId())) { + return; + } else { + unwrappedRequest = multicastRequest.getProtocolMessage(); + } + } else { + unwrappedRequest = request; + } + + // dispatch message to handler + ProtocolHandler desiredHandler = null; + for (final ProtocolHandler handler : getHandlers()) { + if (handler.canHandle(unwrappedRequest)) { + desiredHandler = handler; + break; + } + } + + // if no handler found, throw exception; otherwise handle request + if (desiredHandler == null) { + throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); + } else { + final ProtocolMessage response = desiredHandler.handle(request); + if(response != null) { + try { + + // wrap with listener id + final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response); + + // marshal message + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(multicastResponse, baos); + final byte[] responseBytes = baos.toByteArray(); + + final int maxPacketSizeBytes = getMaxPacketSizeBytes(); + if(responseBytes.length > maxPacketSizeBytes) { + logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + + "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'"); + } + + // create and send packet + final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); + multicastSocket.send(responseDatagram); + + } catch (final IOException ioe) { + throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe); + } + } + } + + } catch (final Throwable t) { + logger.warn("Failed processing protocol message due to " + t, t); + + if ( bulletinRepository != null ) { + final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString()); + bulletinRepository.addBulletin(bulletin); + } + } + } +}