http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java deleted file mode 100644 index 933e5fa..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.DisconnectMessage; -import org.apache.nifi.cluster.protocol.message.FlowRequestMessage; -import org.apache.nifi.cluster.protocol.message.FlowResponseMessage; -import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; -import org.apache.nifi.reporting.BulletinRepository; - -/** - * A wrapper class for consolidating a protocol sender and listener for the cluster - * manager. - * - * @author unattributed - */ -public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener { - - private final ClusterManagerProtocolSender sender; - - private final ProtocolListener listener; - - public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) { - if(sender == null) { - throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null."); - } else if(listener == null) { - throw new IllegalArgumentException("ProtocolListener may not be null."); - } - this.sender = sender; - this.listener = listener; - } - - @Override - public void stop() throws IOException { - if(!isRunning()) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - @Override - public void start() throws IOException { - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - listener.start(); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return listener.getHandlers(); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - listener.setBulletinRepository(bulletinRepository); - sender.setBulletinRepository(bulletinRepository); - } - - @Override - public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException { - return sender.requestFlow(msg); - } - - @Override - public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException { - return sender.requestReconnection(msg); - } - - @Override - public void disconnect(DisconnectMessage msg) throws ProtocolException { - sender.disconnect(msg); - } - - @Override - public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException { - sender.assignPrimaryRole(msg); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java deleted file mode 100644 index 24e51e0..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery; -import org.apache.nifi.reporting.BulletinRepository; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation for discovering services by way of "service broadcast" type - * protocol messages over multicast. - * - * The client caller is responsible for starting and stopping the service - * discovery. The instance must be stopped before termination of the JVM to - * ensure proper resource clean-up. - * - * @author unattributed - */ -public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener { - - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceDiscovery.class); - private final String serviceName; - private final MulticastConfiguration multicastConfiguration; - private final MulticastProtocolListener listener; - private volatile BulletinRepository bulletinRepository; - - /* - * guarded by this - */ - private DiscoverableService service; - - - public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress, - final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - - if (StringUtils.isBlank(serviceName)) { - throw new IllegalArgumentException("Service name may not be null or empty."); - } else if (multicastAddress == null) { - throw new IllegalArgumentException("Multicast address may not be null."); - } else if (multicastAddress.getAddress().isMulticastAddress() == false) { - throw new IllegalArgumentException("Multicast group must be a Class D address."); - } else if (protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } else if (multicastConfiguration == null) { - throw new IllegalArgumentException("Multicast configuration may not be null."); - } - - this.serviceName = serviceName; - this.multicastConfiguration = multicastConfiguration; - this.listener = new MulticastProtocolListener(1, multicastAddress, multicastConfiguration, protocolContext); - listener.addHandler(new ClusterManagerServiceBroadcastHandler()); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - @Override - public synchronized DiscoverableService getService() { - return service; - } - - @Override - public InetSocketAddress getMulticastAddress() { - return listener.getMulticastAddress(); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return Collections.unmodifiableCollection(listener.getHandlers()); - } - - @Override - public void addHandler(ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public boolean removeHandler(ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public void start() throws IOException { - if (isRunning()) { - throw new IllegalStateException("Instance is already running."); - } - listener.start(); - } - - @Override - public void stop() throws IOException { - if (isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - public String getServiceName() { - return serviceName; - } - - public MulticastConfiguration getMulticastConfiguration() { - return multicastConfiguration; - } - - private class ClusterManagerServiceBroadcastHandler implements ProtocolHandler { - - @Override - public boolean canHandle(final ProtocolMessage msg) { - return MessageType.SERVICE_BROADCAST == msg.getType(); - } - - @Override - public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException { - synchronized (ClusterServiceDiscovery.this) { - if (canHandle(msg) == false) { - throw new ProtocolException("Handler cannot handle message type: " + msg.getType()); - } else { - final ServiceBroadcastMessage broadcastMsg = (ServiceBroadcastMessage) msg; - if (serviceName.equals(broadcastMsg.getServiceName())) { - final DiscoverableService oldService = service; - if (oldService == null - || broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName()) == false - || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) { - service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort())); - final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress(); - logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress()))); - } - } - return null; - } - } - } - } - - private String prettyPrint(final InetSocketAddress address) { - if (address == null) { - return "0.0.0.0:0"; - } else { - return address.getHostName() + ":" + address.getPort(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java deleted file mode 100644 index bebfde8..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; -import org.apache.nifi.io.socket.multicast.ServiceDiscovery; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements the ServiceLocator interface for locating the socket address - * of a cluster service. Depending on configuration, the address may be located - * using service discovery. If using service discovery, then the service methods - * must be used for starting and stopping discovery. - * - * Service discovery may be used in conjunction with a fixed port. In this case, - * the service discovery will yield the service IP/host while the fixed port will - * be used for the port. - * - * Alternatively, the instance may be configured with exact service location, in - * which case, no service discovery occurs and the caller will always receive the - * configured service. - * - * @author unattributed - */ -public class ClusterServiceLocator implements ServiceDiscovery { - - private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class); - - private final String serviceName; - - private final ClusterServiceDiscovery serviceDiscovery; - - private final DiscoverableService fixedService; - - private final int fixedServicePort; - - private final AttemptsConfig attemptsConfig = new AttemptsConfig(); - - private final AtomicBoolean running = new AtomicBoolean(false); - - public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) { - if(serviceDiscovery == null) { - throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); - } - this.serviceDiscovery = serviceDiscovery; - this.fixedService = null; - this.fixedServicePort = 0; - this.serviceName = serviceDiscovery.getServiceName(); - } - - public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) { - if(serviceDiscovery == null) { - throw new IllegalArgumentException("ClusterServiceDiscovery may not be null."); - } - this.serviceDiscovery = serviceDiscovery; - this.fixedService = null; - this.fixedServicePort = fixedServicePort; - this.serviceName = serviceDiscovery.getServiceName(); - } - - public ClusterServiceLocator(final DiscoverableService fixedService) { - if(fixedService == null) { - throw new IllegalArgumentException("Service may not be null."); - } - this.serviceDiscovery = null; - this.fixedService = fixedService; - this.fixedServicePort = 0; - this.serviceName = fixedService.getServiceName(); - } - - @Override - public DiscoverableService getService() { - - final int numAttemptsValue; - final int secondsBetweenAttempts; - synchronized(this) { - numAttemptsValue = attemptsConfig.numAttempts; - secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts(); - } - - // try for a configured amount of attempts to retrieve the service address - for(int i = 0; i < numAttemptsValue; i++) { - - if(fixedService != null) { - return fixedService; - } else if(serviceDiscovery != null) { - - final DiscoverableService discoveredService = serviceDiscovery.getService(); - - // if we received an address - if(discoveredService != null) { - // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address - if(fixedServicePort > 0) { - // create service using discovered service name and address with fixed service port - final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort); - final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr); - return result; - } else { - return discoveredService; - } - } - } - - // could not obtain service address, so sleep a bit - try { - logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.", - serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts)); - Thread.sleep(secondsBetweenAttempts * 1000); - } catch(final InterruptedException ie) { - break; - } - - } - - return null; - } - - public boolean isRunning() { - if(serviceDiscovery != null) { - return serviceDiscovery.isRunning(); - } else { - return running.get(); - } - } - - public void start() throws IOException { - - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - if(serviceDiscovery != null) { - serviceDiscovery.start(); - } - running.set(true); - } - - public void stop() throws IOException { - - if(isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - if(serviceDiscovery != null) { - serviceDiscovery.stop(); - } - running.set(false); - } - - public synchronized void setAttemptsConfig(final AttemptsConfig config) { - if(config == null) { - throw new IllegalArgumentException("Attempts configuration may not be null."); - } - this.attemptsConfig.numAttempts = config.numAttempts; - this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts; - this.attemptsConfig.timeBetweenAttempsUnit = config.timeBetweenAttempsUnit; - } - - public synchronized AttemptsConfig getAttemptsConfig() { - final AttemptsConfig config = new AttemptsConfig(); - config.numAttempts = this.attemptsConfig.numAttempts; - config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts; - config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit; - return config; - } - - public static class AttemptsConfig { - - private int numAttempts = 1; - - private int timeBetweenAttempts = 1; - - private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS; - - public int getNumAttempts() { - return numAttempts; - } - - public void setNumAttempts(int numAttempts) { - if(numAttempts <= 0) { - throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts); - } - this.numAttempts = numAttempts; - } - - public TimeUnit getTimeBetweenAttemptsUnit() { - return timeBetweenAttempsUnit; - } - - public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } - this.timeBetweenAttempsUnit = timeBetweenAttempsUnit; - } - - public int getTimeBetweenAttempts() { - return timeBetweenAttempts; - } - - public void setTimeBetweenAttempts(int timeBetweenAttempts) { - if(timeBetweenAttempts <= 0) { - throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts); - } - this.timeBetweenAttempts = timeBetweenAttempts; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java deleted file mode 100644 index e9e7d5b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.io.socket.multicast.DiscoverableService; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster; -import org.apache.nifi.io.socket.multicast.MulticastUtils; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Broadcasts services used by the clustering software using multicast communication. - * A configurable delay occurs after broadcasting the collection of services. - * - * The client caller is responsible for starting and stopping the broadcasting. - * The instance must be stopped before termination of the JVM to ensure proper - * resource clean-up. - * - * @author unattributed - */ -public class ClusterServicesBroadcaster implements MulticastServicesBroadcaster { - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class)); - - private final Set<DiscoverableService> services = new CopyOnWriteArraySet<>(); - - private final InetSocketAddress multicastAddress; - - private final MulticastConfiguration multicastConfiguration; - - private final ProtocolContext<ProtocolMessage> protocolContext; - - private final int broadcastDelayMs; - - private Timer broadcaster; - - private MulticastSocket multicastSocket; - - public ClusterServicesBroadcaster(final InetSocketAddress multicastAddress, - final MulticastConfiguration multicastConfiguration, - final ProtocolContext<ProtocolMessage> protocolContext, final String broadcastDelay) { - - if(multicastAddress == null) { - throw new IllegalArgumentException("Multicast address may not be null."); - } else if(multicastAddress.getAddress().isMulticastAddress() == false) { - throw new IllegalArgumentException("Multicast group address is not a Class D IP address."); - } else if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } else if(multicastConfiguration == null) { - throw new IllegalArgumentException("Multicast configuration may not be null."); - } - - this.services.addAll(services); - this.multicastAddress = multicastAddress; - this.multicastConfiguration = multicastConfiguration; - this.protocolContext = protocolContext; - this.broadcastDelayMs = (int) FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS); - } - - public void start() throws IOException { - - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - // setup socket - multicastSocket = MulticastUtils.createMulticastSocket(multicastConfiguration); - - // setup broadcaster - broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon */ true); - broadcaster.schedule(new TimerTask() { - @Override - public void run() { - for(final DiscoverableService service : services) { - try { - - final InetSocketAddress serviceAddress = service.getServiceAddress(); - logger.debug(String.format("Broadcasting Cluster Service '%s' at address %s:%d", - service.getServiceName(), serviceAddress.getHostName(), serviceAddress.getPort())); - - // create message - final ServiceBroadcastMessage msg = new ServiceBroadcastMessage(); - msg.setServiceName(service.getServiceName()); - msg.setAddress(serviceAddress.getHostName()); - msg.setPort(serviceAddress.getPort()); - - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - marshaller.marshal(msg, baos); - final byte[] packetBytes = baos.toByteArray(); - - // send message - final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, multicastAddress); - multicastSocket.send(packet); - - } catch(final Exception ex) { - logger.warn(String.format("Cluster Services Broadcaster failed broadcasting service '%s' due to: %s", service.getServiceName(), ex), ex); - } - } - } - }, 0, broadcastDelayMs); - } - - public boolean isRunning() { - return (broadcaster != null); - } - - public void stop() { - - if(isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - broadcaster.cancel(); - broadcaster = null; - - // close socket - MulticastUtils.closeQuietly(multicastSocket); - - } - - @Override - public int getBroadcastDelayMs() { - return broadcastDelayMs; - } - - @Override - public Set<DiscoverableService> getServices() { - return Collections.unmodifiableSet(services); - } - - @Override - public InetSocketAddress getMulticastAddress() { - return multicastAddress; - } - - @Override - public boolean addService(final DiscoverableService service) { - return services.add(service); - } - - @Override - public boolean removeService(final String serviceName) { - for(final DiscoverableService service : services) { - if(service.getServiceName().equals(serviceName)) { - return services.remove(service); - } - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java deleted file mode 100644 index 680df65..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayOutputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -public class CopyingInputStream extends FilterInputStream { - private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - private final int maxBytesToCopy; - private final InputStream in; - - public CopyingInputStream(final InputStream in, final int maxBytesToCopy) { - super(in); - this.maxBytesToCopy = maxBytesToCopy; - this.in = in; - } - - @Override - public int read() throws IOException { - final int delegateRead = in.read(); - if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) { - baos.write(delegateRead); - } - - return delegateRead; - } - - @Override - public int read(byte[] b) throws IOException { - final int delegateRead = in.read(b); - if ( delegateRead >= 0 ) { - baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); - } - - return delegateRead; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - final int delegateRead = in.read(b, off, len); - if ( delegateRead >= 0 ) { - baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - getNumberOfBytesCopied())); - } - - return delegateRead; - } - - public byte[] getBytesRead() { - return baos.toByteArray(); - } - - public void writeBytes(final OutputStream out) throws IOException { - baos.writeTo(out); - } - - public int getNumberOfBytesCopied() { - return baos.size(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java deleted file mode 100644 index d3764b3..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.nifi.io.socket.multicast.MulticastConfiguration; -import org.apache.nifi.io.socket.multicast.MulticastListener; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.events.BulletinFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a listener for protocol messages sent over multicast. If a message - * is of type MulticastProtocolMessage, then the underlying protocol message is - * passed to the handler. If the receiving handler produces a message response, - * then the message is wrapped with a MulticastProtocolMessage before being - * sent to the originator. - * - * The client caller is responsible for starting and stopping the listener. - * The instance must be stopped before termination of the JVM to ensure proper - * resource clean-up. - * - * @author unattributed - */ -public class MulticastProtocolListener extends MulticastListener implements ProtocolListener { - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class)); - - // immutable members - private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>(); - private final String listenerId = UUID.randomUUID().toString(); - private final ProtocolContext<ProtocolMessage> protocolContext; - private volatile BulletinRepository bulletinRepository; - - public MulticastProtocolListener( - final int numThreads, - final InetSocketAddress multicastAddress, - final MulticastConfiguration configuration, - final ProtocolContext<ProtocolMessage> protocolContext) { - - super(numThreads, multicastAddress, configuration); - - if (protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - this.protocolContext = protocolContext; - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - @Override - public void start() throws IOException { - - if(super.isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - super.start(); - - } - - @Override - public void stop() throws IOException { - - if(super.isRunning() == false) { - throw new IllegalStateException("Instance is already stopped."); - } - - // shutdown listener - super.stop(); - - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return Collections.unmodifiableCollection(handlers); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - if(handler == null) { - throw new NullPointerException("Protocol handler may not be null."); - } - handlers.add(handler); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return handlers.remove(handler); - } - - @Override - public void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet) { - - try { - - // unmarshall message - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - final ProtocolMessage request = unmarshaller.unmarshal(new ByteArrayInputStream(packet.getData(), 0, packet.getLength())); - - // unwrap multicast message, if necessary - final ProtocolMessage unwrappedRequest; - if(request instanceof MulticastProtocolMessage) { - final MulticastProtocolMessage multicastRequest = (MulticastProtocolMessage) request; - // don't process a message we sent - if(listenerId.equals(multicastRequest.getId())) { - return; - } else { - unwrappedRequest = multicastRequest.getProtocolMessage(); - } - } else { - unwrappedRequest = request; - } - - // dispatch message to handler - ProtocolHandler desiredHandler = null; - for (final ProtocolHandler handler : getHandlers()) { - if (handler.canHandle(unwrappedRequest)) { - desiredHandler = handler; - break; - } - } - - // if no handler found, throw exception; otherwise handle request - if (desiredHandler == null) { - throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); - } else { - final ProtocolMessage response = desiredHandler.handle(request); - if(response != null) { - try { - - // wrap with listener id - final MulticastProtocolMessage multicastResponse = new MulticastProtocolMessage(listenerId, response); - - // marshal message - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(multicastResponse, baos); - final byte[] responseBytes = baos.toByteArray(); - - final int maxPacketSizeBytes = getMaxPacketSizeBytes(); - if(responseBytes.length > maxPacketSizeBytes) { - logger.warn("Cluster protocol handler '" + desiredHandler.getClass() + - "' produced a multicast response with length greater than configured max packet size '" + maxPacketSizeBytes + "'"); - } - - // create and send packet - final DatagramPacket responseDatagram = new DatagramPacket(responseBytes, responseBytes.length, getMulticastAddress().getAddress(), getMulticastAddress().getPort()); - multicastSocket.send(responseDatagram); - - } catch (final IOException ioe) { - throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to: " + ioe, ioe); - } - } - } - - } catch (final Throwable t) { - logger.warn("Failed processing protocol message due to " + t, t); - - if ( bulletinRepository != null ) { - final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process Protocol Message due to " + t.toString()); - bulletinRepository.addBulletin(bulletin); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java deleted file mode 100644 index dc86d24..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.net.Socket; - -import javax.net.ssl.SSLSocket; -import javax.security.cert.X509Certificate; - -import org.apache.nifi.cluster.protocol.NodeProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; -import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; -import org.apache.nifi.io.socket.SocketConfiguration; -import org.apache.nifi.io.socket.SocketUtils; -import org.apache.nifi.io.socket.multicast.DiscoverableService; - -public class NodeProtocolSenderImpl implements NodeProtocolSender { - private final SocketConfiguration socketConfiguration; - private final ClusterServiceLocator clusterManagerProtocolServiceLocator; - private final ProtocolContext<ProtocolMessage> protocolContext; - - public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator, - final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) { - if(clusterManagerProtocolServiceLocator == null) { - throw new IllegalArgumentException("Protocol Service Locator may not be null."); - } else if(socketConfiguration == null) { - throw new IllegalArgumentException("Socket configuration may not be null."); - } else if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - - this.clusterManagerProtocolServiceLocator = clusterManagerProtocolServiceLocator; - this.socketConfiguration = socketConfiguration; - this.protocolContext = protocolContext; - } - - - @Override - public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException { - Socket socket = null; - try { - socket = createSocket(); - - String ncmDn = null; - if ( socket instanceof SSLSocket ) { - final SSLSocket sslSocket = (SSLSocket) socket; - try { - final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain(); - if ( certChains != null && certChains.length > 0 ) { - ncmDn = certChains[0].getSubjectDN().getName(); - } - } catch (final ProtocolException pe) { - throw pe; - } catch (final Exception e) { - throw new ProtocolException(e); - } - } - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - - final ProtocolMessage response; - try { - // unmarshall response and return - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - response = unmarshaller.unmarshal(socket.getInputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe); - } - - if(MessageType.CONNECTION_RESPONSE == response.getType()) { - final ConnectionResponseMessage connectionResponse = (ConnectionResponseMessage) response; - connectionResponse.setClusterManagerDN(ncmDn); - return connectionResponse; - } else { - throw new ProtocolException("Expected message type '" + MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'"); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - - @Override - public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException { - sendProtocolMessage(msg); - } - - @Override - public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException { - sendProtocolMessage(msg); - } - - @Override - public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { - sendProtocolMessage(msg); - } - - @Override - public void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { - sendProtocolMessage(msg); - } - - private Socket createSocket() { - // determine the cluster manager's address - final DiscoverableService service = clusterManagerProtocolServiceLocator.getService(); - if(service == null) { - throw new UnknownServiceAddressException("Cluster Manager's service is not known. Verify a cluster manager is running."); - } - - try { - // create a socket - return SocketUtils.createSocket(service.getServiceAddress(), socketConfiguration); - } catch(final IOException ioe) { - throw new ProtocolException("Failed to create socket due to: " + ioe, ioe); - } - } - - private void sendProtocolMessage(final ProtocolMessage msg) { - Socket socket = null; - try { - socket = createSocket(); - - try { - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(msg, socket.getOutputStream()); - } catch(final IOException ioe) { - throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe); - } - } finally { - SocketUtils.closeQuietly(socket); - } - } - - public SocketConfiguration getSocketConfiguration() { - return socketConfiguration; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java deleted file mode 100644 index 4b359f4..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.nifi.cluster.protocol.NodeProtocolSender; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage; -import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage; -import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; -import org.apache.nifi.reporting.BulletinRepository; - -public class NodeProtocolSenderListener implements NodeProtocolSender, ProtocolListener { - - private final NodeProtocolSender sender; - private final ProtocolListener listener; - - public NodeProtocolSenderListener(final NodeProtocolSender sender, final ProtocolListener listener) { - if(sender == null) { - throw new IllegalArgumentException("NodeProtocolSender may not be null."); - } else if(listener == null) { - throw new IllegalArgumentException("ProtocolListener may not be null."); - } - this.sender = sender; - this.listener = listener; - } - - @Override - public void stop() throws IOException { - if(!isRunning()) { - throw new IllegalStateException("Instance is already stopped."); - } - listener.stop(); - } - - @Override - public void start() throws IOException { - if(isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - listener.start(); - } - - @Override - public boolean isRunning() { - return listener.isRunning(); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return listener.removeHandler(handler); - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return listener.getHandlers(); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - listener.addHandler(handler); - } - - @Override - public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException { - sender.heartbeat(msg); - } - - @Override - public ConnectionResponseMessage requestConnection(final ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException { - return sender.requestConnection(msg); - } - - @Override - public void notifyControllerStartupFailure(final ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { - sender.notifyControllerStartupFailure(msg); - } - - @Override - public void notifyReconnectionFailure(final ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException { - sender.notifyReconnectionFailure(msg); - } - - @Override - public void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException { - sender.sendBulletins(msg); - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - listener.setBulletinRepository(bulletinRepository); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java deleted file mode 100644 index ca30d9b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.impl; - -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLSocket; -import javax.security.cert.X509Certificate; - -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.cluster.protocol.ProtocolHandler; -import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; -import org.apache.nifi.cluster.protocol.message.ProtocolMessage; -import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.io.socket.ServerSocketConfiguration; -import org.apache.nifi.io.socket.SocketListener; -import org.apache.nifi.logging.NiFiLog; -import org.apache.nifi.reporting.Bulletin; -import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.util.StopWatch; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a listener for protocol messages sent over unicast socket. - * - * @author unattributed - */ -public class SocketProtocolListener extends SocketListener implements ProtocolListener { - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class)); - private final ProtocolContext<ProtocolMessage> protocolContext; - private final Collection<ProtocolHandler> handlers = new CopyOnWriteArrayList<>(); - private volatile BulletinRepository bulletinRepository; - - public SocketProtocolListener( - final int numThreads, - final int port, - final ServerSocketConfiguration configuration, - final ProtocolContext<ProtocolMessage> protocolContext) { - - super(numThreads, port, configuration); - - if(protocolContext == null) { - throw new IllegalArgumentException("Protocol Context may not be null."); - } - - this.protocolContext = protocolContext; - } - - @Override - public void setBulletinRepository(final BulletinRepository bulletinRepository) { - this.bulletinRepository = bulletinRepository; - } - - @Override - public void start() throws IOException { - - if(super.isRunning()) { - throw new IllegalStateException("Instance is already started."); - } - - super.start(); - } - - @Override - public void stop() throws IOException { - - if(super.isRunning() == false) { - throw new IOException("Instance is already stopped."); - } - - super.stop(); - - } - - @Override - public Collection<ProtocolHandler> getHandlers() { - return Collections.unmodifiableCollection(handlers); - } - - @Override - public void addHandler(final ProtocolHandler handler) { - if(handler == null) { - throw new NullPointerException("Protocol handler may not be null."); - } - handlers.add(handler); - } - - @Override - public boolean removeHandler(final ProtocolHandler handler) { - return handlers.remove(handler); - } - - @Override - public void dispatchRequest(final Socket socket) { - byte[] receivedMessage = null; - String hostname = null; - final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message - try { - final StopWatch stopWatch = new StopWatch(true); - hostname = socket.getInetAddress().getHostName(); - final String requestId = UUID.randomUUID().toString(); - logger.info("Received request {} from {}", requestId, hostname); - - String requestorDn = null; - if ( socket instanceof SSLSocket ) { - final SSLSocket sslSocket = (SSLSocket) socket; - try { - final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain(); - if ( certChains != null && certChains.length > 0 ) { - requestorDn = certChains[0].getSubjectDN().getName(); - } - } catch (final ProtocolException pe) { - throw pe; - } catch (final Exception e) { - throw new ProtocolException(e); - } - } - - // unmarshall message - final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - final InputStream inStream = socket.getInputStream(); - final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB - logger.debug("Request {} has a message length of {}", requestId, copyingInputStream.getNumberOfBytesCopied()); - - final ProtocolMessage request; - try { - request = unmarshaller.unmarshal(copyingInputStream); - } finally { - receivedMessage = copyingInputStream.getBytesRead(); - } - - request.setRequestorDN(requestorDn); - - // dispatch message to handler - ProtocolHandler desiredHandler = null; - for (final ProtocolHandler handler : getHandlers()) { - if (handler.canHandle(request)) { - desiredHandler = handler; - break; - } - } - - // if no handler found, throw exception; otherwise handle request - if (desiredHandler == null) { - throw new ProtocolException("No handler assigned to handle message type: " + request.getType()); - } else { - final ProtocolMessage response = desiredHandler.handle(request); - if(response != null) { - try { - logger.debug("Sending response for request {}", requestId); - - // marshal message to output stream - final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller(); - marshaller.marshal(response, socket.getOutputStream()); - } catch (final IOException ioe) { - throw new ProtocolException("Failed marshalling protocol message in response to message type: " + request.getType() + " due to " + ioe, ioe); - } - } - } - - stopWatch.stop(); - logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS)); - } catch (final IOException e) { - logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); - - if ( bulletinRepository != null ) { - final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString())); - bulletinRepository.addBulletin(bulletin); - } - } catch (final ProtocolException e) { - logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); - if ( bulletinRepository != null ) { - final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString())); - bulletinRepository.addBulletin(bulletin); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java deleted file mode 100644 index bc68630..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.jaxb; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; - -import org.apache.nifi.cluster.protocol.ProtocolContext; -import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller; -import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller; - -/** - * Implements a context for communicating internally amongst the cluster using - * JAXB. - * - * @param <T> The type of protocol message. - * - * @author unattributed - */ -public class JaxbProtocolContext<T> implements ProtocolContext { - - private static final int BUF_SIZE = (int) Math.pow(2, 10); // 1k - - /* - * A sentinel is used to detect corrupted messages. Relying on the integrity - * of the message size can cause memory issues if the value is corrupted - * and equal to a number larger than the memory size. - */ - private static final byte MESSAGE_PROTOCOL_START_SENTINEL = 0x5A; - - private final JAXBContext jaxbCtx; - - public JaxbProtocolContext(final JAXBContext jaxbCtx) { - this.jaxbCtx = jaxbCtx; - } - - @Override - public ProtocolMessageMarshaller<T> createMarshaller() { - return new ProtocolMessageMarshaller<T>() { - - @Override - public void marshal(final T msg, final OutputStream os) throws IOException { - - try { - - // marshal message to output stream - final Marshaller marshaller = jaxbCtx.createMarshaller(); - final ByteArrayOutputStream msgBytes = new ByteArrayOutputStream(); - marshaller.marshal(msg, msgBytes); - - final DataOutputStream dos = new DataOutputStream(os); - - // write message protocol sentinel - dos.write(MESSAGE_PROTOCOL_START_SENTINEL); - - // write message size in bytes - dos.writeInt(msgBytes.size()); - - // write message - dos.write(msgBytes.toByteArray()); - - dos.flush(); - - } catch (final JAXBException je) { - throw new IOException("Failed marshalling protocol message due to: " + je, je); - } - - } - }; - } - - @Override - public ProtocolMessageUnmarshaller<T> createUnmarshaller() { - return new ProtocolMessageUnmarshaller<T>() { - - @Override - public T unmarshal(final InputStream is) throws IOException { - - try { - - final DataInputStream dis = new DataInputStream(is); - - // check for the presence of the message protocol sentinel - final byte sentinel = (byte) dis.read(); - if ( sentinel == -1 ) { - throw new EOFException(); - } - - if(MESSAGE_PROTOCOL_START_SENTINEL != sentinel) { - throw new IOException("Failed reading protocol message due to malformed header"); - } - - // read the message size - final int msgBytesSize = dis.readInt(); - - // read the message - final ByteBuffer buffer = ByteBuffer.allocate(msgBytesSize); - int totalBytesRead = 0; - do { - final int bytesToRead; - if ((msgBytesSize - totalBytesRead) >= BUF_SIZE) { - bytesToRead = BUF_SIZE; - } else { - bytesToRead = msgBytesSize - totalBytesRead; - } - totalBytesRead += dis.read(buffer.array(), totalBytesRead, bytesToRead); - } while (totalBytesRead < msgBytesSize); - - // unmarshall message and return - final Unmarshaller unmarshaller = jaxbCtx.createUnmarshaller(); - final byte[] msg = new byte[totalBytesRead]; - buffer.get(msg); - return (T) unmarshaller.unmarshal(new ByteArrayInputStream(msg)); - - } catch (final JAXBException je) { - throw new IOException("Failed unmarshalling protocol message due to: " + je, je); - } - - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java deleted file mode 100644 index d9de24f..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.jaxb.message; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import org.apache.nifi.cluster.protocol.NodeIdentifier; - -/** - * @author unattributed - */ -public class AdaptedConnectionRequest { - - private NodeIdentifier nodeIdentifier; - - public AdaptedConnectionRequest() {} - - @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public void setNodeIdentifier(final NodeIdentifier nodeIdentifier) { - this.nodeIdentifier = nodeIdentifier; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java deleted file mode 100644 index c7c783b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.jaxb.message; - -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.apache.nifi.cluster.protocol.StandardDataFlow; - -/** - * @author unattributed - */ -public class AdaptedConnectionResponse { - - private StandardDataFlow dataFlow; - private NodeIdentifier nodeIdentifier; - private boolean blockedByFirewall; - private boolean primary; - private int tryLaterSeconds; - private Integer managerRemoteInputPort; - private Boolean managerRemoteCommsSecure; - private String instanceId; - - public AdaptedConnectionResponse() {} - - @XmlJavaTypeAdapter(DataFlowAdapter.class) - public StandardDataFlow getDataFlow() { - return dataFlow; - } - - public void setDataFlow(StandardDataFlow dataFlow) { - this.dataFlow = dataFlow; - } - - @XmlJavaTypeAdapter(NodeIdentifierAdapter.class) - public NodeIdentifier getNodeIdentifier() { - return nodeIdentifier; - } - - public void setNodeIdentifier(NodeIdentifier nodeIdentifier) { - this.nodeIdentifier = nodeIdentifier; - } - - public int getTryLaterSeconds() { - return tryLaterSeconds; - } - - public void setTryLaterSeconds(int tryLaterSeconds) { - this.tryLaterSeconds = tryLaterSeconds; - } - - public boolean isBlockedByFirewall() { - return blockedByFirewall; - } - - public void setBlockedByFirewall(boolean blockedByFirewall) { - this.blockedByFirewall = blockedByFirewall; - } - - public boolean isPrimary() { - return primary; - } - - public void setPrimary(boolean primary) { - this.primary = primary; - } - - public boolean shouldTryLater() { - return tryLaterSeconds > 0; - } - - public void setManagerRemoteInputPort(Integer managerRemoteInputPort) { - this.managerRemoteInputPort = managerRemoteInputPort; - } - - public Integer getManagerRemoteInputPort() { - return managerRemoteInputPort; - } - - public void setManagerRemoteCommsSecure(Boolean secure) { - this.managerRemoteCommsSecure = secure; - } - - public Boolean isManagerRemoteCommsSecure() { - return managerRemoteCommsSecure; - } - - public void setInstanceId(String instanceId) { - this.instanceId = instanceId; - } - - public String getInstanceId() { - return instanceId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java deleted file mode 100644 index 89d903b..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.jaxb.message; - -/** - * @author unattributed - */ -public class AdaptedCounter { - - private String groupName; - - private String name; - - private long value; - - public AdaptedCounter() {} - - public String getGroupName() { - return groupName; - } - - public void setGroupName(String counterGroupName) { - this.groupName = counterGroupName; - } - - public String getName() { - return name; - } - - public void setName(String counterName) { - this.name = counterName; - } - - public long getValue() { - return value; - } - - public void setValue(long value) { - this.value = value; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java deleted file mode 100644 index bb97619..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.protocol.jaxb.message; - -/** - * @author unattributed - */ -public class AdaptedDataFlow { - - private byte[] flow; - private byte[] templates; - private byte[] snippets; - - private boolean autoStartProcessors; - - public AdaptedDataFlow() {} - - public byte[] getFlow() { - return flow; - } - - public void setFlow(byte[] flow) { - this.flow = flow; - } - - public byte[] getTemplates() { - return templates; - } - - public void setTemplates(byte[] templates) { - this.templates = templates; - } - - public byte[] getSnippets() { - return snippets; - } - - public void setSnippets(byte[] snippets) { - this.snippets = snippets; - } - - public boolean isAutoStartProcessors() { - return autoStartProcessors; - } - - public void setAutoStartProcessors(boolean runningAllProcessors) { - this.autoStartProcessors = runningAllProcessors; - } - -}