http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java new file mode 100644 index 0000000..dc86d24 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.net.Socket; + +import javax.net.ssl.SSLSocket; +import javax.security.cert.X509Certificate; + +import org.apache.nifi.cluster.protocol.NodeProtocolSender; +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; +import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; +import org.apache.nifi.io.socket.SocketConfiguration; +import org.apache.nifi.io.socket.SocketUtils; +import org.apache.nifi.io.socket.multicast.DiscoverableService; + +public class NodeProtocolSenderImpl implements NodeProtocolSender { + private final SocketConfiguration socketConfiguration; + private final ClusterServiceLocator clusterManagerProtocolServiceLocator; + private final ProtocolContext<ProtocolMessage> protocolContext; + + public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator, + final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { + if(clusterManagerProtocolServiceLocator == null) { + throw new IllegalArgumentException("Protocol Service Locator may not be null."); + } else if(socketConfiguration == null) { + throw new IllegalArgumentException("Socket configuration may not be null."); + } else if(protocolContext == null) { + throw new IllegalArgumentException("Protocol Context may not be null."); + } + + this.clusterManagerProtocolServiceLocator = clusterManagerProtocolServiceLocator; + this.socketConfiguration = socketConfiguration; + this.protocolContext = protocolContext; + } + + + @Override + public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException { + Socket socket = null; + try { + socket = createSocket(); + + String ncmDn = null; + if ( socket instanceof SSLSocket ) { + final SSLSocket sslSocket = (SSLSocket) socket; + try { + final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain(); + if ( certChains != null && certChains.length > 0 ) { + ncmDn = certChains[0].getSubjectDN().getName(); + } + } catch (final ProtocolException pe) { + throw pe; + } catch (final Exception e) { + throw new ProtocolException(e); + } + } + + try { + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + + final ProtocolMessage response; + try { + // unmarshall response and return + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); + response = unmarshaller.unmarshal(socket.getInputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); + } + + if(MessageType.CONNECTION_RESPONSE == response.getType()) { + final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response; + connectionResponse.setClusterManagerDN(ncmDn); + return connectionResponse; + } else { + throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'"); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + + @Override + public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException { + sendProtocolMessage(msg); + } + + @Override + public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException { + sendProtocolMessage(msg); + } + + @Override + public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { + sendProtocolMessage(msg); + } + + @Override + public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { + sendProtocolMessage(msg); + } + + private Socket createSocket() { + // determine the cluster manager's address + final DiscoverableService service = clusterManagerProtocolServiceLocator.getService(); + if(service == null) { + throw new UnknownServiceAddressException("Cluster Manager's service is not known. Verify a cluster manager is running."); + } + + try { + // create a socket + return SocketUtils.createSocket(service.getServiceAddress(), socketConfiguration); + } catch(final IOException ioe) { + throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); + } + } + + private void sendProtocolMessage(final ProtocolMessage msg) { + Socket socket = null; + try { + socket = createSocket(); + + try { + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(msg, socket.getOutputStream()); + } catch(final IOException ioe) { + throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); + } + } finally { + SocketUtils.closeQuietly(socket); + } + } + + public SocketConfiguration getSocketConfiguration() { + return socketConfiguration; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java new file mode 100644 index 0000000..4b359f4 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.nifi.cluster.protocol.NodeProtocolSender; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; +import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; +import org.apache.nifi.reporting.BulletinRepository; + +public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener { + + private final NodeProtocolSender sender; + private final ProtocolListener listener; + + public NodeProtocolSenderListener(final NodeProtocolSender sender, final ProtocolListener listener) { + if(sender == null) { + throw new IllegalArgumentException("NodeProtocolSender may not be null."); + } else if(listener == null) { + throw new IllegalArgumentException("ProtocolListener may not be null."); + } + this.sender = sender; + this.listener = listener; + } + + @Override + public void stop() throws IOException { + if(!isRunning()) { + throw new IllegalStateException("Instance is already stopped."); + } + listener.stop(); + } + + @Override + public void start() throws IOException { + if(isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + listener.start(); + } + + @Override + public boolean isRunning() { + return listener.isRunning(); + } + + @Override + public boolean removeHandler(final ProtocolHandler handler) { + return listener.removeHandler(handler); + } + + @Override + public Collection<ProtocolHandler> getHandlers() { + return listener.getHandlers(); + } + + @Override + public void addHandler(final ProtocolHandler handler) { + listener.addHandler(handler); + } + + @Override + public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException { + sender.heartbeat(msg); + } + + @Override + public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException { + return sender.requestConnection(msg); + } + + @Override + public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { + sender.notifyControllerStartupFailure(msg); + } + + @Override + public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { + sender.notifyReconnectionFailure(msg); + } + + @Override + public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException { + sender.sendBulletins(msg); + } + + @Override + public void setBulletinRepository(final BulletinRepository bulletinRepository) { + listener.setBulletinRepository(bulletinRepository); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java new file mode 100644 index 0000000..ca30d9b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLSocket; +import javax.security.cert.X509Certificate; + +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolException; +import org.apache.nifi.cluster.protocol.ProtocolHandler; +import org.apache.nifi.cluster.protocol.ProtocolListener; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; +import org.apache.nifi.cluster.protocol.message.ProtocolMessage; +import org.apache.nifi.events.BulletinFactory; +import org.apache.nifi.io.socket.ServerSocketConfiguration; +import org.apache.nifi.io.socket.SocketListener; +import org.apache.nifi.logging.NiFiLog; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.util.StopWatch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a listener for protocol messages sent over unicast socket. + * + * @author unattributed + */ +public class SocketProtocolListener extends SocketListener implements ProtocolListener { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class)); + private final ProtocolContext<ProtocolMessage> protocolContext; + private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>(); + private volatile BulletinRepository bulletinRepository; + + public SocketProtocolListener( + final int numThreads, + final int port, + final ServerSocketConfiguration configuration, + final ProtocolContext<ProtocolMessage> protocolContext) { + + super(numThreads, port, configuration); + + if(protocolContext == null) { + throw new IllegalArgumentException("Protocol Context may not be null."); + } + + this.protocolContext = protocolContext; + } + + @Override + public void setBulletinRepository(final BulletinRepository bulletinRepository) { + this.bulletinRepository = bulletinRepository; + } + + @Override + public void start() throws IOException { + + if(super.isRunning()) { + throw new IllegalStateException("Instance is already started."); + } + + super.start(); + } + + @Override + public void stop() throws IOException { + + if(super.isRunning() == false) { + throw new IOException("Instance is already stopped."); + } + + super.stop(); + + } + + @Override + public Collection<ProtocolHandler> getHandlers() { + return Collections.unmodifiableCollection(handlers); + } + + @Override + public void addHandler(final ProtocolHandler handler) { + if(handler == null) { + throw new NullPointerException("Protocol handler may not be null."); + } + handlers.add(handler); + } + + @Override + public boolean removeHandler(final ProtocolHandler handler) { + return handlers.remove(handler); + } + + @Override + public void dispatchRequest(final Socket socket) { + byte[] receivedMessage = null; + String hostname = null; + final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message + try { + final StopWatch stopWatch = new StopWatch(true); + hostname = socket.getInetAddress().getHostName(); + final String requestId = UUID.randomUUID().toString(); + logger.info("Received request {} from {}", requestId, hostname); + + String requestorDn = null; + if ( socket instanceof SSLSocket ) { + final SSLSocket sslSocket = (SSLSocket) socket; + try { + final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain(); + if ( certChains != null && certChains.length > 0 ) { + requestorDn = certChains[0].getSubjectDN().getName(); + } + } catch (final ProtocolException pe) { + throw pe; + } catch (final Exception e) { + throw new ProtocolException(e); + } + } + + // unmarshall message + final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); + final InputStream inStream = socket.getInputStream(); + final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB + logger.debug("Request {} has a message length of {}", requestId, copyingInputStream.getNumberOfBytesCopied()); + + final ProtocolMessage request; + try { + request = unmarshaller.unmarshal(copyingInputStream); + } finally { + receivedMessage = copyingInputStream.getBytesRead(); + } + + request.setRequestorDN(requestorDn); + + // dispatch message to handler + ProtocolHandler desiredHandler = null; + for (final ProtocolHandler handler : getHandlers()) { + if (handler.canHandle(request)) { + desiredHandler = handler; + break; + } + } + + // if no handler found, throw exception; otherwise handle request + if (desiredHandler == null) { + throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); + } else { + final ProtocolMessage response = desiredHandler.handle(request); + if(response != null) { + try { + logger.debug("Sending response for request {}", requestId); + + // marshal message to output stream + final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); + marshaller.marshal(response, socket.getOutputStream()); + } catch (final IOException ioe) { + throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to " + ioe, ioe); + } + } + } + + stopWatch.stop(); + logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + } catch (final IOException e) { + logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); + + if ( bulletinRepository != null ) { + final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString())); + bulletinRepository.addBulletin(bulletin); + } + } catch (final ProtocolException e) { + logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); + if ( bulletinRepository != null ) { + final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString())); + bulletinRepository.addBulletin(bulletin); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java new file mode 100644 index 0000000..bc68630 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; + +import org.apache.nifi.cluster.protocol.ProtocolContext; +import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; +import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; + +/** + * Implements a context for communicating internally amongst the cluster using + * JAXB. + * + * @param <T> The type of protocol message. + * + * @author unattributed + */ +public class JaxbProtocolContext<T> implements ProtocolContext { + + private static final int BUF_SIZE = (int) Math.pow(2, 10); // 1k + + /* + * A sentinel is used to detect corrupted messages. Relying on the integrity + * of the message size can cause memory issues if the value is corrupted + * and equal to a number larger than the memory size. + */ + private static final byte MESSAGE_PROTOCOL_START_SENTINEL = 0x5A; + + private final JAXBContext jaxbCtx; + + public JaxbProtocolContext(final JAXBContext jaxbCtx) { + this.jaxbCtx = jaxbCtx; + } + + @Override + public ProtocolMessageMarshaller<T> createMarshaller() { + return new ProtocolMessageMarshaller<T>() { + + @Override + public void marshal(final T msg, final OutputStream os) throws IOException { + + try { + + // marshal message to output stream + final Marshaller marshaller = jaxbCtx.createMarshaller(); + final ByteArrayOutputStream msgBytes = new ByteArrayOutputStream(); + marshaller.marshal(msg, msgBytes); + + final DataOutputStream dos = new DataOutputStream(os); + + // write message protocol sentinel + dos.write(MESSAGE_PROTOCOL_START_SENTINEL); + + // write message size in bytes + dos.writeInt(msgBytes.size()); + + // write message + dos.write(msgBytes.toByteArray()); + + dos.flush(); + + } catch (final JAXBException je) { + throw new IOException("Failed marshalling protocol message due to: " + je, je); + } + + } + }; + } + + @Override + public ProtocolMessageUnmarshaller<T> createUnmarshaller() { + return new ProtocolMessageUnmarshaller<T>() { + + @Override + public T unmarshal(final InputStream is) throws IOException { + + try { + + final DataInputStream dis = new DataInputStream(is); + + // check for the presence of the message protocol sentinel + final byte sentinel = (byte) dis.read(); + if ( sentinel == -1 ) { + throw new EOFException(); + } + + if(MESSAGE_PROTOCOL_START_SENTINEL != sentinel) { + throw new IOException("Failed reading protocol message due to malformed header"); + } + + // read the message size + final int msgBytesSize = dis.readInt(); + + // read the message + final ByteBuffer buffer = ByteBuffer.allocate(msgBytesSize); + int totalBytesRead = 0; + do { + final int bytesToRead; + if ((msgBytesSize - totalBytesRead) >= BUF_SIZE) { + bytesToRead = BUF_SIZE; + } else { + bytesToRead = msgBytesSize - totalBytesRead; + } + totalBytesRead += dis.read(buffer.array(), totalBytesRead, bytesToRead); + } while (totalBytesRead < msgBytesSize); + + // unmarshall message and return + final Unmarshaller unmarshaller = jaxbCtx.createUnmarshaller(); + final byte[] msg = new byte[totalBytesRead]; + buffer.get(msg); + return (T) unmarshaller.unmarshal(new ByteArrayInputStream(msg)); + + } catch (final JAXBException je) { + throw new IOException("Failed unmarshalling protocol message due to: " + je, je); + } + + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java new file mode 100644 index 0000000..d9de24f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * @author unattributed + */ +public class AdaptedConnectionRequest { + + private NodeIdentifier nodeIdentifier; + + public AdaptedConnectionRequest() {} + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public void setNodeIdentifier(final NodeIdentifier nodeIdentifier) { + this.nodeIdentifier = nodeIdentifier; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java new file mode 100644 index 0000000..c7c783b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.StandardDataFlow; + +/** + * @author unattributed + */ +public class AdaptedConnectionResponse { + + private StandardDataFlow dataFlow; + private NodeIdentifier nodeIdentifier; + private boolean blockedByFirewall; + private boolean primary; + private int tryLaterSeconds; + private Integer managerRemoteInputPort; + private Boolean managerRemoteCommsSecure; + private String instanceId; + + public AdaptedConnectionResponse() {} + + @XmlJavaTypeAdapter(DataFlowAdapter.class) + public StandardDataFlow getDataFlow() { + return dataFlow; + } + + public void setDataFlow(StandardDataFlow dataFlow) { + this.dataFlow = dataFlow; + } + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public void setNodeIdentifier(NodeIdentifier nodeIdentifier) { + this.nodeIdentifier = nodeIdentifier; + } + + public int getTryLaterSeconds() { + return tryLaterSeconds; + } + + public void setTryLaterSeconds(int tryLaterSeconds) { + this.tryLaterSeconds = tryLaterSeconds; + } + + public boolean isBlockedByFirewall() { + return blockedByFirewall; + } + + public void setBlockedByFirewall(boolean blockedByFirewall) { + this.blockedByFirewall = blockedByFirewall; + } + + public boolean isPrimary() { + return primary; + } + + public void setPrimary(boolean primary) { + this.primary = primary; + } + + public boolean shouldTryLater() { + return tryLaterSeconds > 0; + } + + public void setManagerRemoteInputPort(Integer managerRemoteInputPort) { + this.managerRemoteInputPort = managerRemoteInputPort; + } + + public Integer getManagerRemoteInputPort() { + return managerRemoteInputPort; + } + + public void setManagerRemoteCommsSecure(Boolean secure) { + this.managerRemoteCommsSecure = secure; + } + + public Boolean isManagerRemoteCommsSecure() { + return managerRemoteCommsSecure; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public String getInstanceId() { + return instanceId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java new file mode 100644 index 0000000..89d903b --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +/** + * @author unattributed + */ +public class AdaptedCounter { + + private String groupName; + + private String name; + + private long value; + + public AdaptedCounter() {} + + public String getGroupName() { + return groupName; + } + + public void setGroupName(String counterGroupName) { + this.groupName = counterGroupName; + } + + public String getName() { + return name; + } + + public void setName(String counterName) { + this.name = counterName; + } + + public long getValue() { + return value; + } + + public void setValue(long value) { + this.value = value; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java new file mode 100644 index 0000000..bb97619 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +/** + * @author unattributed + */ +public class AdaptedDataFlow { + + private byte[] flow; + private byte[] templates; + private byte[] snippets; + + private boolean autoStartProcessors; + + public AdaptedDataFlow() {} + + public byte[] getFlow() { + return flow; + } + + public void setFlow(byte[] flow) { + this.flow = flow; + } + + public byte[] getTemplates() { + return templates; + } + + public void setTemplates(byte[] templates) { + this.templates = templates; + } + + public byte[] getSnippets() { + return snippets; + } + + public void setSnippets(byte[] snippets) { + this.snippets = snippets; + } + + public boolean isAutoStartProcessors() { + return autoStartProcessors; + } + + public void setAutoStartProcessors(boolean runningAllProcessors) { + this.autoStartProcessors = runningAllProcessors; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java new file mode 100644 index 0000000..5b9d9b7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedHeartbeat.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * @author unattributed + */ +public class AdaptedHeartbeat { + + private NodeIdentifier nodeIdentifier; + private byte[] payload; + private boolean primary; + private boolean connected; + + public AdaptedHeartbeat() {} + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public void setNodeIdentifier(NodeIdentifier nodeIdentifier) { + this.nodeIdentifier = nodeIdentifier; + } + + public boolean isPrimary() { + return primary; + } + + public void setPrimary(boolean primary) { + this.primary = primary; + } + + public boolean isConnected() { + return connected; + } + + public void setConnected(boolean connected) { + this.connected = connected; + } + + public byte[] getPayload() { + return payload; + } + + public void setPayload(byte[] payload) { + this.payload = payload; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java new file mode 100644 index 0000000..98e2438 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeBulletins.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * @author unattributed + */ +public class AdaptedNodeBulletins { + + private NodeIdentifier nodeIdentifier; + + private byte[] payload; + + public AdaptedNodeBulletins() {} + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeIdentifier() { + return nodeIdentifier; + } + + public void setNodeIdentifier(NodeIdentifier nodeIdentifier) { + this.nodeIdentifier = nodeIdentifier; + } + + public byte[] getPayload() { + return payload; + } + + public void setPayload(byte[] payload) { + this.payload = payload; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java new file mode 100644 index 0000000..8134ea3 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeIdentifier.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +/** + * @author unattributed + */ +public class AdaptedNodeIdentifier { + + private String id; + + private String apiAddress; + + private int apiPort; + + private String socketAddress; + + private int socketPort; + + public AdaptedNodeIdentifier() {} + + public String getApiAddress() { + return apiAddress; + } + + public void setApiAddress(String apiAddress) { + this.apiAddress = apiAddress; + } + + public int getApiPort() { + return apiPort; + } + + public void setApiPort(int apiPort) { + this.apiPort = apiPort; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getSocketAddress() { + return socketAddress; + } + + public void setSocketAddress(String socketAddress) { + this.socketAddress = socketAddress; + } + + public int getSocketPort() { + return socketPort; + } + + public void setSocketPort(int socketPort) { + this.socketPort = socketPort; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java new file mode 100644 index 0000000..1f91cf1 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlAdapter; +import org.apache.nifi.cluster.protocol.ConnectionRequest; + +/** + * @author unattributed + */ +public class ConnectionRequestAdapter extends XmlAdapter<AdaptedConnectionRequest, ConnectionRequest> { + + @Override + public AdaptedConnectionRequest marshal(final ConnectionRequest cr) { + final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest(); + if(cr != null) { + aCr.setNodeIdentifier(cr.getProposedNodeIdentifier()); + } + return aCr; + } + + @Override + public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) { + return new ConnectionRequest(aCr.getNodeIdentifier()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java new file mode 100644 index 0000000..143bab0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlAdapter; +import org.apache.nifi.cluster.protocol.ConnectionResponse; + +/** + * @author unattributed + */ +public class ConnectionResponseAdapter extends XmlAdapter<AdaptedConnectionResponse, ConnectionResponse> { + + @Override + public AdaptedConnectionResponse marshal(final ConnectionResponse cr) { + final AdaptedConnectionResponse aCr = new AdaptedConnectionResponse(); + if(cr != null) { + aCr.setDataFlow(cr.getDataFlow()); + aCr.setNodeIdentifier(cr.getNodeIdentifier()); + aCr.setTryLaterSeconds(cr.getTryLaterSeconds()); + aCr.setBlockedByFirewall(cr.isBlockedByFirewall()); + aCr.setPrimary(cr.isPrimary()); + aCr.setManagerRemoteInputPort(cr.getManagerRemoteInputPort()); + aCr.setManagerRemoteCommsSecure(cr.isManagerRemoteCommsSecure()); + aCr.setInstanceId(cr.getInstanceId()); + } + return aCr; + } + + @Override + public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) { + if(aCr.shouldTryLater()) { + return new ConnectionResponse(aCr.getTryLaterSeconds()); + } else if(aCr.isBlockedByFirewall()) { + return ConnectionResponse.createBlockedByFirewallResponse(); + } else { + return new ConnectionResponse(aCr.getNodeIdentifier(), aCr.getDataFlow(), aCr.isPrimary(), + aCr.getManagerRemoteInputPort(), aCr.isManagerRemoteCommsSecure(), aCr.getInstanceId()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java new file mode 100644 index 0000000..8d9467f --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlAdapter; + +import org.apache.nifi.cluster.protocol.StandardDataFlow; + +/** + * @author unattributed + */ +public class DataFlowAdapter extends XmlAdapter<AdaptedDataFlow, StandardDataFlow> { + + @Override + public AdaptedDataFlow marshal(final StandardDataFlow df) { + + final AdaptedDataFlow aDf = new AdaptedDataFlow(); + + if(df != null) { + aDf.setFlow(df.getFlow()); + aDf.setTemplates(df.getTemplates()); + aDf.setSnippets(df.getSnippets()); + aDf.setAutoStartProcessors(df.isAutoStartProcessors()); + } + + return aDf; + } + + @Override + public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) { + final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), aDf.getTemplates(), aDf.getSnippets()); + dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors()); + return dataFlow; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java new file mode 100644 index 0000000..0e073b6 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/HeartbeatAdapter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlAdapter; +import org.apache.nifi.cluster.protocol.Heartbeat; + +/** + * @author unattributed + */ +public class HeartbeatAdapter extends XmlAdapter<AdaptedHeartbeat, Heartbeat> { + + @Override + public AdaptedHeartbeat marshal(final Heartbeat hb) { + + final AdaptedHeartbeat aHb = new AdaptedHeartbeat(); + + if(hb != null) { + // set node identifier + aHb.setNodeIdentifier(hb.getNodeIdentifier()); + + // set payload + aHb.setPayload(hb.getPayload()); + + // set leader flag + aHb.setPrimary(hb.isPrimary()); + + // set connected flag + aHb.setConnected(hb.isConnected()); + } + + return aHb; + } + + @Override + public Heartbeat unmarshal(final AdaptedHeartbeat aHb) { + return new Heartbeat(aHb.getNodeIdentifier(), aHb.isPrimary(), aHb.isConnected(), aHb.getPayload()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java new file mode 100644 index 0000000..c3a57f5 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/JaxbProtocolUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; + +/** + * @author unattributed + */ +public final class JaxbProtocolUtils { + + public static final String JAXB_CONTEXT_PATH = ObjectFactory.class.getPackage().getName(); + + public static final JAXBContext JAXB_CONTEXT = initializeJaxbContext(); + + /** + * Load the JAXBContext version. + */ + private static JAXBContext initializeJaxbContext() { + try { + return JAXBContext.newInstance(JAXB_CONTEXT_PATH); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext.", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java new file mode 100644 index 0000000..1ae41f7 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeBulletinsAdapter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlAdapter; +import org.apache.nifi.cluster.protocol.NodeBulletins; + +/** + * @author unattributed + */ +public class NodeBulletinsAdapter extends XmlAdapter<AdaptedNodeBulletins, NodeBulletins> { + + @Override + public AdaptedNodeBulletins marshal(final NodeBulletins hb) { + + final AdaptedNodeBulletins adaptedBulletins = new AdaptedNodeBulletins(); + + if(hb != null) { + // set node identifier + adaptedBulletins.setNodeIdentifier(hb.getNodeIdentifier()); + + // set payload + adaptedBulletins.setPayload(hb.getPayload()); + } + + return adaptedBulletins; + } + + @Override + public NodeBulletins unmarshal(final AdaptedNodeBulletins adaptedBulletins) { + return new NodeBulletins(adaptedBulletins.getNodeIdentifier(), adaptedBulletins.getPayload()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java new file mode 100644 index 0000000..fe2d8a4 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeIdentifierAdapter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.adapters.XmlAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; + +/** + * @author unattributed + */ +public class NodeIdentifierAdapter extends XmlAdapter<AdaptedNodeIdentifier, NodeIdentifier> { + + @Override + public AdaptedNodeIdentifier marshal(final NodeIdentifier ni) { + if(ni == null) { + return null; + } else { + final AdaptedNodeIdentifier aNi = new AdaptedNodeIdentifier(); + aNi.setId(ni.getId()); + aNi.setApiAddress(ni.getApiAddress()); + aNi.setApiPort(ni.getApiPort()); + aNi.setSocketAddress(ni.getSocketAddress()); + aNi.setSocketPort(ni.getSocketPort()); + return aNi; + } + } + + @Override + public NodeIdentifier unmarshal(final AdaptedNodeIdentifier aNi) { + if(aNi == null) { + return null; + } else { + return new NodeIdentifier(aNi.getId(), aNi.getApiAddress(), aNi.getApiPort(), aNi.getSocketAddress(), aNi.getSocketPort()); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java new file mode 100644 index 0000000..1613536 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.jaxb.message; + +import javax.xml.bind.annotation.XmlRegistry; + +import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; +import org.apache.nifi.cluster.protocol.message.DisconnectMessage; +import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; +import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; +import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; +import org.apache.nifi.cluster.protocol.message.PingMessage; +import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; +import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; +import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; + +/** + * @author unattributed + */ +@XmlRegistry +public class ObjectFactory { + + public ObjectFactory() {} + + public ReconnectionRequestMessage createReconnectionRequestMessage() { + return new ReconnectionRequestMessage(); + } + + public ReconnectionFailureMessage createReconnectionFailureMessage() { + return new ReconnectionFailureMessage(); + } + + public ReconnectionResponseMessage createReconnectionResponseMessage() { + return new ReconnectionResponseMessage(); + } + + public DisconnectMessage createDisconnectionMessage() { + return new DisconnectMessage(); + } + + public ConnectionRequestMessage createConnectionRequestMessage() { + return new ConnectionRequestMessage(); + } + + public ConnectionResponseMessage createConnectionResponseMessage() { + return new ConnectionResponseMessage(); + } + + public ServiceBroadcastMessage createServiceBroadcastMessage() { + return new ServiceBroadcastMessage(); + } + + public HeartbeatMessage createHeartbeatMessage() { + return new HeartbeatMessage(); + } + + public FlowRequestMessage createFlowRequestMessage() { + return new FlowRequestMessage(); + } + + public FlowResponseMessage createFlowResponseMessage() { + return new FlowResponseMessage(); + } + + public PingMessage createPingMessage() { + return new PingMessage(); + } + + public MulticastProtocolMessage createMulticastProtocolMessage() { + return new MulticastProtocolMessage(); + } + + public ControllerStartupFailureMessage createControllerStartupFailureMessage() { + return new ControllerStartupFailureMessage(); + } + + public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() { + return new PrimaryRoleAssignmentMessage(); + } + + public NodeBulletinsMessage createBulletinsMessage() { + return new NodeBulletinsMessage(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java new file mode 100644 index 0000000..344de4e --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionRequestMessage.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.cluster.protocol.ConnectionRequest; + +/** + * @author unattributed + */ +@XmlRootElement(name = "connectionRequestMessage") +public class ConnectionRequestMessage extends ProtocolMessage { + + private ConnectionRequest connectionRequest; + + public ConnectionRequestMessage() {} + + public ConnectionRequest getConnectionRequest() { + return connectionRequest; + } + + public void setConnectionRequest(ConnectionRequest connectionRequest) { + this.connectionRequest = connectionRequest; + } + + @Override + public MessageType getType() { + return MessageType.CONNECTION_REQUEST; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java new file mode 100644 index 0000000..a262d7a --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ConnectionResponseMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.cluster.protocol.ConnectionResponse; + +/** + * @author unattributed + */ +@XmlRootElement(name = "connectionResponseMessage") +public class ConnectionResponseMessage extends ProtocolMessage { + + private ConnectionResponse connectionResponse; + private String clusterManagerDN; + + public ConnectionResponseMessage() {} + + public ConnectionResponse getConnectionResponse() { + return connectionResponse; + } + + public void setConnectionResponse(final ConnectionResponse connectionResponse) { + this.connectionResponse = connectionResponse; + + if ( clusterManagerDN != null ) { + this.connectionResponse.setClusterManagerDN(clusterManagerDN); + } + } + + public void setClusterManagerDN(final String dn) { + if ( connectionResponse != null ) { + connectionResponse.setClusterManagerDN(dn); + } + this.clusterManagerDN = dn; + } + + /** + * Returns the DN of the NCM, if it is available or <code>null</code> otherwise. + * + * @return + */ + public String getClusterManagerDN() { + return clusterManagerDN; + } + + @Override + public MessageType getType() { + return MessageType.CONNECTION_RESPONSE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java new file mode 100644 index 0000000..ebc1cae --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ControllerStartupFailureMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + +/** + * @author unattributed + */ +@XmlRootElement(name = "controllerStartupFailureMessage") +public class ControllerStartupFailureMessage extends ExceptionMessage { + + private NodeIdentifier nodeId; + + public ControllerStartupFailureMessage() {} + + @Override + public MessageType getType() { + return MessageType.CONTROLLER_STARTUP_FAILURE; + } + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java new file mode 100644 index 0000000..8aa7a40 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/DisconnectMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + +/** + * @author unattributed + */ +@XmlRootElement(name = "disconnectionMessage") +public class DisconnectMessage extends ProtocolMessage { + + private NodeIdentifier nodeId; + private String explanation; + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + + public String getExplanation() { + return explanation; + } + + public void setExplanation(String explanation) { + this.explanation = explanation; + } + + @Override + public MessageType getType() { + return MessageType.DISCONNECTION_REQUEST; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java new file mode 100644 index 0000000..99a6dee --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ExceptionMessage.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; + +/** + * @author unattributed + */ +@XmlRootElement(name = "exceptionMessage") +public class ExceptionMessage extends ProtocolMessage { + + private String exceptionMessage; + + public ExceptionMessage() {} + + public String getExceptionMessage() { + return exceptionMessage; + } + + public void setExceptionMessage(String exceptionMessage) { + this.exceptionMessage = exceptionMessage; + } + + @Override + public MessageType getType() { + return MessageType.EXCEPTION; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java new file mode 100644 index 0000000..4a10538 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/FlowRequestMessage.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.protocol.message; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; + +/** + * @author unattributed + */ +@XmlRootElement(name = "flowRequestMessage") +public class FlowRequestMessage extends ProtocolMessage { + + private NodeIdentifier nodeId; + + @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) + public NodeIdentifier getNodeId() { + return nodeId; + } + + public void setNodeId(NodeIdentifier nodeId) { + this.nodeId = nodeId; + } + + @Override + public MessageType getType() { + return MessageType.FLOW_REQUEST; + } + +}