http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
deleted file mode 100644
index 933e5fa..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
-import org.apache.nifi.reporting.BulletinRepository;
-
-/**
- * A wrapper class for consolidating a protocol sender and listener for the 
cluster
- * manager.
- * 
- * @author unattributed
- */
-public class ClusterManagerProtocolSenderListener implements 
ClusterManagerProtocolSender, ProtocolListener {
-    
-    private final ClusterManagerProtocolSender sender;
-    
-    private final ProtocolListener listener;
-    
-    public ClusterManagerProtocolSenderListener(final 
ClusterManagerProtocolSender sender, final ProtocolListener listener) {
-        if(sender == null) {
-            throw new IllegalArgumentException("ClusterManagerProtocolSender 
may not be null.");
-        } else if(listener == null) {
-            throw new IllegalArgumentException("ProtocolListener may not be 
null.");
-        }
-        this.sender = sender;
-        this.listener = listener;
-    }
-
-    @Override
-    public void stop() throws IOException {
-        if(!isRunning()) {
-            throw new IllegalStateException("Instance is already stopped.");
-        }
-        listener.stop();
-    }
-
-    @Override
-    public void start() throws IOException {
-        if(isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
-        }
-        listener.start();
-    }
-
-    @Override
-    public boolean isRunning() {
-        return listener.isRunning();
-    }
-
-    @Override
-    public boolean removeHandler(final ProtocolHandler handler) {
-        return listener.removeHandler(handler);
-    }
-
-    @Override
-    public Collection<ProtocolHandler> getHandlers() {
-        return listener.getHandlers();
-    }
-
-    @Override
-    public void addHandler(final ProtocolHandler handler) {
-        listener.addHandler(handler);
-    }
-    
-    @Override
-    public void setBulletinRepository(final BulletinRepository 
bulletinRepository) {
-        listener.setBulletinRepository(bulletinRepository);
-        sender.setBulletinRepository(bulletinRepository);
-    }
-    
-    @Override
-    public FlowResponseMessage requestFlow(final FlowRequestMessage msg) 
throws ProtocolException {
-        return sender.requestFlow(msg);
-    }
-
-    @Override
-    public ReconnectionResponseMessage requestReconnection(final 
ReconnectionRequestMessage msg) throws ProtocolException {
-        return sender.requestReconnection(msg);
-    }
-
-    @Override
-    public void disconnect(DisconnectMessage msg) throws ProtocolException {
-        sender.disconnect(msg);
-    }
-    
-    @Override
-    public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws 
ProtocolException {
-        sender.assignPrimaryRole(msg);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
deleted file mode 100644
index 24e51e0..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery;
-import org.apache.nifi.reporting.BulletinRepository;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation for discovering services by way of "service broadcast" 
type
- * protocol messages over multicast.
- *
- * The client caller is responsible for starting and stopping the service
- * discovery. The instance must be stopped before termination of the JVM to
- * ensure proper resource clean-up.
- *
- * @author unattributed
- */
-public class ClusterServiceDiscovery implements MulticastServiceDiscovery, 
ProtocolListener {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ClusterServiceDiscovery.class);
-    private final String serviceName;
-    private final MulticastConfiguration multicastConfiguration;
-    private final MulticastProtocolListener listener;
-    private volatile BulletinRepository bulletinRepository;
-
-    /*
-     * guarded by this
-     */
-    private DiscoverableService service;
-
-    
-    public ClusterServiceDiscovery(final String serviceName, final 
InetSocketAddress multicastAddress,
-            final MulticastConfiguration multicastConfiguration, final 
ProtocolContext<ProtocolMessage> protocolContext) {
-
-        if (StringUtils.isBlank(serviceName)) {
-            throw new IllegalArgumentException("Service name may not be null 
or empty.");
-        } else if (multicastAddress == null) {
-            throw new IllegalArgumentException("Multicast address may not be 
null.");
-        } else if (multicastAddress.getAddress().isMulticastAddress() == 
false) {
-            throw new IllegalArgumentException("Multicast group must be a 
Class D address.");
-        } else if (protocolContext == null) {
-            throw new IllegalArgumentException("Protocol Context may not be 
null.");
-        } else if (multicastConfiguration == null) {
-            throw new IllegalArgumentException("Multicast configuration may 
not be null.");
-        }
-
-        this.serviceName = serviceName;
-        this.multicastConfiguration = multicastConfiguration;
-        this.listener = new MulticastProtocolListener(1, multicastAddress, 
multicastConfiguration, protocolContext);
-        listener.addHandler(new ClusterManagerServiceBroadcastHandler());
-    }
-
-    @Override
-    public void setBulletinRepository(final BulletinRepository 
bulletinRepository) {
-        this.bulletinRepository = bulletinRepository;
-    }
-
-    @Override
-    public synchronized DiscoverableService getService() {
-        return service;
-    }
-
-    @Override
-    public InetSocketAddress getMulticastAddress() {
-        return listener.getMulticastAddress();
-    }
-
-    @Override
-    public Collection<ProtocolHandler> getHandlers() {
-        return Collections.unmodifiableCollection(listener.getHandlers());
-    }
-
-    @Override
-    public void addHandler(ProtocolHandler handler) {
-        listener.addHandler(handler);
-    }
-
-    @Override
-    public boolean removeHandler(ProtocolHandler handler) {
-        return listener.removeHandler(handler);
-    }
-
-    @Override
-    public boolean isRunning() {
-        return listener.isRunning();
-    }
-
-    @Override
-    public void start() throws IOException {
-        if (isRunning()) {
-            throw new IllegalStateException("Instance is already running.");
-        }
-        listener.start();
-    }
-
-    @Override
-    public void stop() throws IOException {
-        if (isRunning() == false) {
-            throw new IllegalStateException("Instance is already stopped.");
-        }
-        listener.stop();
-    }
-
-    public String getServiceName() {
-        return serviceName;
-    }
-
-    public MulticastConfiguration getMulticastConfiguration() {
-        return multicastConfiguration;
-    }
-
-    private class ClusterManagerServiceBroadcastHandler implements 
ProtocolHandler {
-
-        @Override
-        public boolean canHandle(final ProtocolMessage msg) {
-            return MessageType.SERVICE_BROADCAST == msg.getType();
-        }
-
-        @Override
-        public ProtocolMessage handle(final ProtocolMessage msg) throws 
ProtocolException {
-            synchronized (ClusterServiceDiscovery.this) {
-                if (canHandle(msg) == false) {
-                    throw new ProtocolException("Handler cannot handle message 
type: " + msg.getType());
-                } else {
-                    final ServiceBroadcastMessage broadcastMsg = 
(ServiceBroadcastMessage) msg;
-                    if (serviceName.equals(broadcastMsg.getServiceName())) {
-                        final DiscoverableService oldService = service;
-                        if (oldService == null
-                                || 
broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName())
 == false
-                                || broadcastMsg.getPort() != 
oldService.getServiceAddress().getPort()) {
-                            service = new DiscoverableServiceImpl(serviceName, 
InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), 
broadcastMsg.getPort()));
-                            final InetSocketAddress oldServiceAddress = 
(oldService == null) ? null : oldService.getServiceAddress();
-                            logger.info(String.format("Updating cluster 
service address for '%s' from '%s' to '%s'", serviceName, 
prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
-                        }
-                    }
-                    return null;
-                }
-            }
-        }
-    }
-
-    private String prettyPrint(final InetSocketAddress address) {
-        if (address == null) {
-            return "0.0.0.0:0";
-        } else {
-            return address.getHostName() + ":" + address.getPort();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
deleted file mode 100644
index bebfde8..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.ServiceDiscovery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements the ServiceLocator interface for locating the socket address
- * of a cluster service.  Depending on configuration, the address may be 
located
- * using service discovery.  If using service discovery, then the service 
methods
- * must be used for starting and stopping discovery.
- * 
- * Service discovery may be used in conjunction with a fixed port.  In this 
case,
- * the service discovery will yield the service IP/host while the fixed port 
will
- * be used for the port.
- * 
- * Alternatively, the instance may be configured with exact service location, 
in
- * which case, no service discovery occurs and the caller will always receive 
the
- * configured service.
- * 
- * @author unattributed
- */
-public class ClusterServiceLocator implements ServiceDiscovery {
-    
-    private static final Logger logger = 
LoggerFactory.getLogger(ClusterServiceLocator.class);
-    
-    private final String serviceName;
-    
-    private final ClusterServiceDiscovery serviceDiscovery;
-    
-    private final DiscoverableService fixedService;
-
-    private final int fixedServicePort;
-    
-    private final AttemptsConfig attemptsConfig = new AttemptsConfig();
-    
-    private final AtomicBoolean running = new AtomicBoolean(false);
-    
-    public ClusterServiceLocator(final ClusterServiceDiscovery 
serviceDiscovery) {
-        if(serviceDiscovery == null) {
-            throw new IllegalArgumentException("ClusterServiceDiscovery may 
not be null.");
-        }
-        this.serviceDiscovery = serviceDiscovery;
-        this.fixedService = null;
-        this.fixedServicePort = 0;
-        this.serviceName = serviceDiscovery.getServiceName();
-    }
-    
-    public ClusterServiceLocator(final ClusterServiceDiscovery 
serviceDiscovery, final int fixedServicePort) {
-        if(serviceDiscovery == null) {
-            throw new IllegalArgumentException("ClusterServiceDiscovery may 
not be null.");
-        }
-        this.serviceDiscovery = serviceDiscovery;
-        this.fixedService = null;
-        this.fixedServicePort = fixedServicePort;
-        this.serviceName = serviceDiscovery.getServiceName();
-    }
-    
-    public ClusterServiceLocator(final DiscoverableService fixedService) {
-        if(fixedService == null) {
-            throw new IllegalArgumentException("Service may not be null.");
-        }
-        this.serviceDiscovery = null;
-        this.fixedService = fixedService;
-        this.fixedServicePort = 0;
-        this.serviceName = fixedService.getServiceName();
-    }
-    
-    @Override
-    public DiscoverableService getService() {
-        
-        final int numAttemptsValue;
-        final int secondsBetweenAttempts;
-        synchronized(this) {
-            numAttemptsValue = attemptsConfig.numAttempts;
-            secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts();
-        }
-        
-        // try for a configured amount of attempts to retrieve the service 
address
-        for(int i = 0; i < numAttemptsValue; i++) {
-
-            if(fixedService != null) {
-                return fixedService;
-            } else if(serviceDiscovery != null) {
-                
-                final DiscoverableService discoveredService = 
serviceDiscovery.getService();
-                
-                // if we received an address
-                if(discoveredService != null) {
-                    // if we were configured with a fixed port, then use the 
discovered host and fixed port; otherwise use the discovered address
-                    if(fixedServicePort > 0) {
-                        // create service using discovered service name and 
address with fixed service port
-                        final InetSocketAddress addr = 
InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(),
 fixedServicePort);
-                        final DiscoverableService result = new 
DiscoverableServiceImpl(discoveredService.getServiceName(), addr);
-                        return result;
-                    } else {
-                        return discoveredService;
-                    }
-                }
-            }
-            
-            // could not obtain service address, so sleep a bit
-            try {
-                logger.debug(String.format("Locating Cluster Service '%s' 
Attempt: %d of %d failed.  Trying again in %d seconds.", 
-                    serviceName, (i + 1), numAttemptsValue, 
secondsBetweenAttempts));
-                Thread.sleep(secondsBetweenAttempts * 1000);
-            } catch(final InterruptedException ie) {
-                break;
-            }
-            
-        }
-
-        return null;
-    }
-
-    public boolean isRunning() {
-        if(serviceDiscovery != null) {
-            return serviceDiscovery.isRunning();
-        } else {
-            return running.get();
-        }
-    }
-
-    public void start() throws IOException {
-        
-        if(isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
-        }
-        
-        if(serviceDiscovery != null) {
-            serviceDiscovery.start();
-        }
-        running.set(true);
-    }
-
-    public void stop() throws IOException {
-        
-        if(isRunning() == false) {
-            throw new IllegalStateException("Instance is already stopped.");
-        }
-        
-        if(serviceDiscovery != null) {
-            serviceDiscovery.stop();
-        }
-        running.set(false);
-    }
-    
-    public synchronized void setAttemptsConfig(final AttemptsConfig config) {
-        if(config == null) {
-            throw new IllegalArgumentException("Attempts configuration may not 
be null.");
-        }
-        this.attemptsConfig.numAttempts = config.numAttempts;
-        this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts;
-        this.attemptsConfig.timeBetweenAttempsUnit = 
config.timeBetweenAttempsUnit;
-    }
-
-    public synchronized AttemptsConfig getAttemptsConfig() {
-        final AttemptsConfig config = new AttemptsConfig();
-        config.numAttempts = this.attemptsConfig.numAttempts;
-        config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts;
-        config.timeBetweenAttempsUnit = 
this.attemptsConfig.timeBetweenAttempsUnit;
-        return config;
-    }
-    
-    public static class AttemptsConfig {
-        
-        private int numAttempts = 1;
-        
-        private int timeBetweenAttempts = 1;
-        
-        private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS;
-        
-        public int getNumAttempts() {
-            return numAttempts;
-        }
-
-        public void setNumAttempts(int numAttempts) {
-            if(numAttempts <= 0) {
-                throw new IllegalArgumentException("Number of attempts must be 
positive: " + numAttempts);
-            }
-            this.numAttempts = numAttempts;
-        }
-
-        public TimeUnit getTimeBetweenAttemptsUnit() {
-            return timeBetweenAttempsUnit;
-        }
-
-        public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) 
{
-            if(timeBetweenAttempts <= 0) {
-                throw new IllegalArgumentException("Time between attempts must 
be positive: " + numAttempts);
-            } 
-            this.timeBetweenAttempsUnit = timeBetweenAttempsUnit;
-        }
-
-        public int getTimeBetweenAttempts() {
-            return timeBetweenAttempts;
-        }
-
-        public void setTimeBetweenAttempts(int timeBetweenAttempts) {
-            if(timeBetweenAttempts <= 0) {
-            throw new IllegalArgumentException("Time between attempts must be 
positive: " + numAttempts);
-        } 
-            this.timeBetweenAttempts = timeBetweenAttempts;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
deleted file mode 100644
index e9e7d5b..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcaster.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastServicesBroadcaster;
-import org.apache.nifi.io.socket.multicast.MulticastUtils;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Broadcasts services used by the clustering software using multicast 
communication.
- * A configurable delay occurs after broadcasting the collection of services.
- * 
- * The client caller is responsible for starting and stopping the broadcasting.
- * The instance must be stopped before termination of the JVM to ensure proper
- * resource clean-up.
- * 
- * @author unattributed
- */
-public class ClusterServicesBroadcaster implements 
MulticastServicesBroadcaster {
-    
-    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(ClusterServicesBroadcaster.class));
-    
-    private final Set<DiscoverableService> services = new 
CopyOnWriteArraySet<>();
-
-    private final InetSocketAddress multicastAddress;
-    
-    private final MulticastConfiguration multicastConfiguration;
-    
-    private final ProtocolContext<ProtocolMessage> protocolContext;
-    
-    private final int broadcastDelayMs;
-    
-    private Timer broadcaster;
-    
-    private MulticastSocket multicastSocket;
-    
-    public ClusterServicesBroadcaster(final InetSocketAddress 
multicastAddress, 
-            final MulticastConfiguration multicastConfiguration, 
-            final ProtocolContext<ProtocolMessage> protocolContext, final 
String broadcastDelay) {
-        
-        if(multicastAddress == null) {
-            throw new IllegalArgumentException("Multicast address may not be 
null.");
-        } else if(multicastAddress.getAddress().isMulticastAddress() == false) 
{
-            throw new IllegalArgumentException("Multicast group address is not 
a Class D IP address.");
-        } else if(protocolContext == null) {
-            throw new IllegalArgumentException("Protocol Context may not be 
null.");
-        } else if(multicastConfiguration == null) {
-            throw new IllegalArgumentException("Multicast configuration may 
not be null.");
-        }
-        
-        this.services.addAll(services);
-        this.multicastAddress = multicastAddress;
-        this.multicastConfiguration = multicastConfiguration;
-        this.protocolContext = protocolContext;
-        this.broadcastDelayMs = (int) 
FormatUtils.getTimeDuration(broadcastDelay, TimeUnit.MILLISECONDS);
-    }
-    
-    public void start() throws IOException {
-
-        if(isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
-        }
-        
-        // setup socket
-        multicastSocket = 
MulticastUtils.createMulticastSocket(multicastConfiguration);
-        
-        // setup broadcaster
-        broadcaster = new Timer("Cluster Services Broadcaster", /* is daemon 
*/ true);
-        broadcaster.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                for(final DiscoverableService service : services) {
-                    try {
-
-                        final InetSocketAddress serviceAddress = 
service.getServiceAddress();
-                        logger.debug(String.format("Broadcasting Cluster 
Service '%s' at address %s:%d", 
-                            service.getServiceName(), 
serviceAddress.getHostName(), serviceAddress.getPort()));
-                        
-                        // create message
-                        final ServiceBroadcastMessage msg = new 
ServiceBroadcastMessage();
-                        msg.setServiceName(service.getServiceName());
-                        msg.setAddress(serviceAddress.getHostName());
-                        msg.setPort(serviceAddress.getPort());
-
-                        // marshal message to output stream
-                        final ProtocolMessageMarshaller<ProtocolMessage> 
marshaller = protocolContext.createMarshaller();
-                        final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                        marshaller.marshal(msg, baos);
-                        final byte[] packetBytes = baos.toByteArray();
-
-                        // send message
-                        final DatagramPacket packet = new 
DatagramPacket(packetBytes, packetBytes.length, multicastAddress);
-                        multicastSocket.send(packet);
-
-                    } catch(final Exception ex) {
-                        logger.warn(String.format("Cluster Services 
Broadcaster failed broadcasting service '%s' due to: %s", 
service.getServiceName(), ex), ex);
-                    }
-                }
-            }
-        }, 0, broadcastDelayMs);
-    }
-    
-    public boolean isRunning() {
-        return (broadcaster != null);
-    }
-    
-    public void stop() {
-        
-        if(isRunning() == false) {
-            throw new IllegalStateException("Instance is already stopped.");
-        }
-        
-        broadcaster.cancel();
-        broadcaster = null;
-
-        // close socket
-        MulticastUtils.closeQuietly(multicastSocket);
-        
-    }
-
-    @Override
-    public int getBroadcastDelayMs() {
-        return broadcastDelayMs;
-    }
-    
-    @Override
-    public Set<DiscoverableService> getServices() {
-        return Collections.unmodifiableSet(services);
-    }
-
-    @Override
-    public InetSocketAddress getMulticastAddress() {
-        return multicastAddress;
-    }
-    
-    @Override
-    public boolean addService(final DiscoverableService service) {
-        return services.add(service);
-    }
-    
-    @Override
-    public boolean removeService(final String serviceName) {
-        for(final DiscoverableService service : services) {
-            if(service.getServiceName().equals(serviceName)) {
-                return services.remove(service);
-            }
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
deleted file mode 100644
index 680df65..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/CopyingInputStream.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class CopyingInputStream extends FilterInputStream {
-    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    private final int maxBytesToCopy;
-    private final InputStream in;
-
-    public CopyingInputStream(final InputStream in, final int maxBytesToCopy) {
-        super(in);
-        this.maxBytesToCopy = maxBytesToCopy;
-        this.in = in;
-    }
-    
-    @Override
-    public int read() throws IOException {
-        final int delegateRead = in.read();
-        if ( delegateRead != -1 && getNumberOfBytesCopied() < maxBytesToCopy ) 
{
-            baos.write(delegateRead);
-        }
-        
-        return delegateRead;
-    }
-    
-    @Override
-    public int read(byte[] b) throws IOException {
-        final int delegateRead = in.read(b);
-        if ( delegateRead >= 0 ) {
-            baos.write(b, 0, Math.min(delegateRead, maxBytesToCopy - 
getNumberOfBytesCopied()));
-        }
-        
-        return delegateRead;
-    }
-    
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        final int delegateRead = in.read(b, off, len);
-        if ( delegateRead >= 0 ) {
-            baos.write(b, off, Math.min(delegateRead, maxBytesToCopy - 
getNumberOfBytesCopied()));
-        }
-        
-        return delegateRead;
-    }
-    
-    public byte[] getBytesRead() {
-        return baos.toByteArray();
-    }
-    
-    public void writeBytes(final OutputStream out) throws IOException {
-        baos.writeTo(out);
-    }
-    
-    public int getNumberOfBytesCopied() {
-        return baos.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
deleted file mode 100644
index d3764b3..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListener.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastListener;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.events.BulletinFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for protocol messages sent over multicast.  If a 
message
- * is of type MulticastProtocolMessage, then the underlying protocol message is
- * passed to the handler.  If the receiving handler produces a message 
response,
- * then the message is wrapped with a MulticastProtocolMessage before being 
- * sent to the originator.
- * 
- * The client caller is responsible for starting and stopping the listener.
- * The instance must be stopped before termination of the JVM to ensure proper
- * resource clean-up.
- * 
- * @author unattributed
- */
-public class MulticastProtocolListener extends MulticastListener implements 
ProtocolListener {
-    
-    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(MulticastProtocolListener.class));
-
-    // immutable members
-    private final Collection<ProtocolHandler> handlers = new 
CopyOnWriteArrayList<>();
-    private final String listenerId = UUID.randomUUID().toString();
-    private final ProtocolContext<ProtocolMessage> protocolContext;
-    private volatile BulletinRepository bulletinRepository;
-
-    public MulticastProtocolListener(
-            final int numThreads,
-            final InetSocketAddress multicastAddress,
-            final MulticastConfiguration configuration,
-            final ProtocolContext<ProtocolMessage> protocolContext) {
-
-        super(numThreads, multicastAddress, configuration);
-        
-        if (protocolContext == null) {
-            throw new IllegalArgumentException("Protocol Context may not be 
null.");
-        }
-        this.protocolContext = protocolContext;
-    }
-
-    @Override
-    public void setBulletinRepository(final BulletinRepository 
bulletinRepository) {
-        this.bulletinRepository = bulletinRepository;
-    }
-
-    @Override
-    public void start() throws IOException {
-
-        if(super.isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
-        }
-        
-        super.start();
-        
-    }
-
-    @Override
-    public void stop() throws IOException {
-
-        if(super.isRunning() == false) {
-            throw new IllegalStateException("Instance is already stopped.");
-        }
-        
-        // shutdown listener
-        super.stop();
-
-    }
-
-    @Override
-    public Collection<ProtocolHandler> getHandlers() {
-        return Collections.unmodifiableCollection(handlers);
-    }
-
-    @Override
-    public void addHandler(final ProtocolHandler handler) {
-        if(handler == null) {
-            throw new NullPointerException("Protocol handler may not be 
null.");
-        }
-        handlers.add(handler);
-    }
-    
-    @Override
-    public boolean removeHandler(final ProtocolHandler handler) {
-        return handlers.remove(handler);
-    }
-    
-    @Override
-    public void dispatchRequest(final MulticastSocket multicastSocket, final 
DatagramPacket packet) {
-
-        try {
-
-            // unmarshall message
-            final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = 
protocolContext.createUnmarshaller();
-            final ProtocolMessage request = unmarshaller.unmarshal(new 
ByteArrayInputStream(packet.getData(), 0, packet.getLength()));
-
-            // unwrap multicast message, if necessary
-            final ProtocolMessage unwrappedRequest;
-            if(request instanceof MulticastProtocolMessage) {
-                final MulticastProtocolMessage multicastRequest = 
(MulticastProtocolMessage) request;
-                // don't process a message we sent
-                if(listenerId.equals(multicastRequest.getId())) {
-                    return;
-                } else {
-                    unwrappedRequest = multicastRequest.getProtocolMessage();
-                }
-            } else {
-                unwrappedRequest = request;
-            }
-            
-            // dispatch message to handler
-            ProtocolHandler desiredHandler = null;
-            for (final ProtocolHandler handler : getHandlers()) {
-                if (handler.canHandle(unwrappedRequest)) {
-                    desiredHandler = handler;
-                    break;
-                }
-            }
-
-            // if no handler found, throw exception; otherwise handle request
-            if (desiredHandler == null) {
-                throw new ProtocolException("No handler assigned to handle 
message type: " + request.getType());
-            } else {
-                final ProtocolMessage response = 
desiredHandler.handle(request);
-                if(response != null) {
-                    try {
-                        
-                        // wrap with listener id
-                        final MulticastProtocolMessage multicastResponse = new 
MulticastProtocolMessage(listenerId, response);
-                        
-                        // marshal message
-                        final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                        final ProtocolMessageMarshaller<ProtocolMessage> 
marshaller = protocolContext.createMarshaller();
-                        marshaller.marshal(multicastResponse, baos);
-                        final byte[] responseBytes = baos.toByteArray();
-                        
-                        final int maxPacketSizeBytes = getMaxPacketSizeBytes();
-                        if(responseBytes.length > maxPacketSizeBytes) {
-                            logger.warn("Cluster protocol handler '" + 
desiredHandler.getClass() + 
-                                "' produced a multicast response with length 
greater than configured max packet size '" + maxPacketSizeBytes + "'");
-                        }
-                        
-                        // create and send packet
-                        final DatagramPacket responseDatagram = new 
DatagramPacket(responseBytes, responseBytes.length, 
getMulticastAddress().getAddress(), getMulticastAddress().getPort()); 
-                        multicastSocket.send(responseDatagram);
-                        
-                    } catch (final IOException ioe) {
-                        throw new ProtocolException("Failed marshalling 
protocol message in response to message type: " + request.getType() + " due to: 
" + ioe, ioe);
-                    }
-                }
-            }
-
-        } catch (final Throwable t) {
-            logger.warn("Failed processing protocol message due to " + t, t);
-            
-            if ( bulletinRepository != null ) {
-                final Bulletin bulletin = 
BulletinFactory.createBulletin("Clustering", "WARNING", "Failed to process 
Protocol Message due to " + t.toString());
-                bulletinRepository.addBulletin(bulletin);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
deleted file mode 100644
index dc86d24..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import javax.net.ssl.SSLSocket;
-import javax.security.cert.X509Certificate;
-
-import org.apache.nifi.cluster.protocol.NodeProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-
-public class NodeProtocolSenderImpl implements NodeProtocolSender {
-    private final SocketConfiguration socketConfiguration;
-    private final ClusterServiceLocator clusterManagerProtocolServiceLocator;
-    private final ProtocolContext<ProtocolMessage> protocolContext;
-    
-    public NodeProtocolSenderImpl(final ClusterServiceLocator 
clusterManagerProtocolServiceLocator, 
-            final SocketConfiguration socketConfiguration, final 
ProtocolContext<ProtocolMessage> protocolContext) {
-        if(clusterManagerProtocolServiceLocator == null) {
-            throw new IllegalArgumentException("Protocol Service Locator may 
not be null.");
-        } else if(socketConfiguration == null) {
-            throw new IllegalArgumentException("Socket configuration may not 
be null.");
-        } else if(protocolContext == null) {
-            throw new IllegalArgumentException("Protocol Context may not be 
null.");
-        }
-        
-        this.clusterManagerProtocolServiceLocator = 
clusterManagerProtocolServiceLocator;
-        this.socketConfiguration = socketConfiguration;
-        this.protocolContext = protocolContext;
-    }
-    
-    
-    @Override
-    public ConnectionResponseMessage requestConnection(final 
ConnectionRequestMessage msg) throws ProtocolException, 
UnknownServiceAddressException {
-        Socket socket = null;
-        try {
-            socket = createSocket();
-            
-            String ncmDn = null;
-            if ( socket instanceof SSLSocket ) {
-                final SSLSocket sslSocket = (SSLSocket) socket;
-                try {
-                    final X509Certificate[] certChains = 
sslSocket.getSession().getPeerCertificateChain();
-                    if ( certChains != null && certChains.length > 0 ) {
-                        ncmDn = certChains[0].getSubjectDN().getName();
-                    }
-                } catch (final ProtocolException pe) {
-                    throw pe;
-                } catch (final Exception e) {
-                    throw new ProtocolException(e);
-                }
-            }
-            
-            try {
-                // marshal message to output stream
-                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = 
protocolContext.createMarshaller();
-                marshaller.marshal(msg, socket.getOutputStream());
-            } catch(final IOException ioe) {
-                throw new ProtocolException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
-            } 
-            
-            final ProtocolMessage response;
-            try {
-                // unmarshall response and return
-                final ProtocolMessageUnmarshaller<ProtocolMessage> 
unmarshaller = protocolContext.createUnmarshaller();
-                response = unmarshaller.unmarshal(socket.getInputStream());
-            } catch(final IOException ioe) {
-                throw new ProtocolException("Failed unmarshalling '" + 
MessageType.CONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
-            } 
-            
-            if(MessageType.CONNECTION_RESPONSE == response.getType()) {
-                final ConnectionResponseMessage connectionResponse = 
(ConnectionResponseMessage) response;
-                connectionResponse.setClusterManagerDN(ncmDn);
-                return connectionResponse;
-            } else {
-                throw new ProtocolException("Expected message type '" + 
MessageType.CONNECTION_RESPONSE + "' but found '" + response.getType() + "'");
-            }
-        } finally {
-            SocketUtils.closeQuietly(socket);
-        }
-    }
-    
-    
-    @Override
-    public void heartbeat(final HeartbeatMessage msg) throws 
ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void sendBulletins(NodeBulletinsMessage msg) throws 
ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void notifyControllerStartupFailure(final 
ControllerStartupFailureMessage msg) throws ProtocolException, 
UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-
-    @Override
-    public void notifyReconnectionFailure(ReconnectionFailureMessage msg) 
throws ProtocolException, UnknownServiceAddressException {
-        sendProtocolMessage(msg);
-    }
-    
-    private Socket createSocket() {
-        // determine the cluster manager's address
-        final DiscoverableService service = 
clusterManagerProtocolServiceLocator.getService(); 
-        if(service == null) {
-            throw new UnknownServiceAddressException("Cluster Manager's 
service is not known.  Verify a cluster manager is running.");
-        }
-        
-        try {
-            // create a socket
-            return SocketUtils.createSocket(service.getServiceAddress(), 
socketConfiguration); 
-        } catch(final IOException ioe) {
-            throw new ProtocolException("Failed to create socket due to: " + 
ioe, ioe);
-        }
-    }
-    
-    private void sendProtocolMessage(final ProtocolMessage msg) {
-        Socket socket = null;
-        try {
-            socket = createSocket();
-
-            try {
-                // marshal message to output stream
-                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = 
protocolContext.createMarshaller();
-                marshaller.marshal(msg, socket.getOutputStream());
-            } catch(final IOException ioe) {
-                throw new ProtocolException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
-            }
-        } finally {
-            SocketUtils.closeQuietly(socket);
-        }
-    }
-    
-    public SocketConfiguration getSocketConfiguration() {
-        return socketConfiguration;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
deleted file mode 100644
index 4b359f4..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderListener.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.cluster.protocol.NodeProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import 
org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-import org.apache.nifi.reporting.BulletinRepository;
-
-public class NodeProtocolSenderListener implements NodeProtocolSender, 
ProtocolListener {
-    
-    private final NodeProtocolSender sender;
-    private final ProtocolListener listener;
-    
-    public NodeProtocolSenderListener(final NodeProtocolSender sender, final 
ProtocolListener listener) {
-        if(sender == null) {
-            throw new IllegalArgumentException("NodeProtocolSender may not be 
null.");
-        } else if(listener == null) {
-            throw new IllegalArgumentException("ProtocolListener may not be 
null.");
-        }
-        this.sender = sender;
-        this.listener = listener;
-    }
-
-    @Override
-    public void stop() throws IOException {
-        if(!isRunning()) {
-            throw new IllegalStateException("Instance is already stopped.");
-        }
-        listener.stop();
-    }
-
-    @Override
-    public void start() throws IOException {
-        if(isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
-        }
-        listener.start();
-    }
-
-    @Override
-    public boolean isRunning() {
-        return listener.isRunning();
-    }
-
-    @Override
-    public boolean removeHandler(final ProtocolHandler handler) {
-        return listener.removeHandler(handler);
-    }
-
-    @Override
-    public Collection<ProtocolHandler> getHandlers() {
-        return listener.getHandlers();
-    }
-
-    @Override
-    public void addHandler(final ProtocolHandler handler) {
-        listener.addHandler(handler);
-    }
-
-    @Override
-    public void heartbeat(final HeartbeatMessage msg) throws 
ProtocolException, UnknownServiceAddressException {
-        sender.heartbeat(msg);
-    }
-
-    @Override
-    public ConnectionResponseMessage requestConnection(final 
ConnectionRequestMessage msg) throws ProtocolException, 
UnknownServiceAddressException {
-        return sender.requestConnection(msg);
-    }
-    
-    @Override
-    public void notifyControllerStartupFailure(final 
ControllerStartupFailureMessage msg) throws ProtocolException, 
UnknownServiceAddressException {
-        sender.notifyControllerStartupFailure(msg);
-    }
-    
-    @Override
-    public void notifyReconnectionFailure(final ReconnectionFailureMessage 
msg) throws ProtocolException, UnknownServiceAddressException {
-        sender.notifyReconnectionFailure(msg);
-    }
-
-    @Override
-    public void sendBulletins(NodeBulletinsMessage msg) throws 
ProtocolException, UnknownServiceAddressException {
-        sender.sendBulletins(msg);
-    }
-
-    @Override
-    public void setBulletinRepository(final BulletinRepository 
bulletinRepository) {
-        listener.setBulletinRepository(bulletinRepository);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
deleted file mode 100644
index ca30d9b..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLSocket;
-import javax.security.cert.X509Certificate;
-
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketListener;
-import org.apache.nifi.logging.NiFiLog;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.util.StopWatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements a listener for protocol messages sent over unicast socket. 
- * 
- * @author unattributed
- */
-public class SocketProtocolListener extends SocketListener implements 
ProtocolListener {
-
-    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(SocketProtocolListener.class));
-    private final ProtocolContext<ProtocolMessage> protocolContext;
-    private final Collection<ProtocolHandler> handlers = new 
CopyOnWriteArrayList<>();
-    private volatile BulletinRepository bulletinRepository;
-    
-    public SocketProtocolListener(
-            final int numThreads,
-            final int port,
-            final ServerSocketConfiguration configuration,
-            final ProtocolContext<ProtocolMessage> protocolContext) {
-
-        super(numThreads, port, configuration);
-        
-        if(protocolContext == null) {
-            throw new IllegalArgumentException("Protocol Context may not be 
null.");
-        }
-        
-        this.protocolContext = protocolContext;
-    }
-
-    @Override
-    public void setBulletinRepository(final BulletinRepository 
bulletinRepository) {
-        this.bulletinRepository = bulletinRepository;
-    }
-    
-    @Override
-    public void start() throws IOException {
-
-        if(super.isRunning()) {
-            throw new IllegalStateException("Instance is already started.");
-        }
-        
-        super.start();
-    }
-
-    @Override
-    public void stop() throws IOException {
-
-        if(super.isRunning() == false) {
-            throw new IOException("Instance is already stopped.");
-        }
-        
-        super.stop();
-
-    }
-
-    @Override
-    public Collection<ProtocolHandler> getHandlers() {
-        return Collections.unmodifiableCollection(handlers);
-    }
-
-    @Override
-    public void addHandler(final ProtocolHandler handler) {
-        if(handler == null) {
-            throw new NullPointerException("Protocol handler may not be 
null.");
-        }
-        handlers.add(handler);
-    }
-    
-    @Override
-    public boolean removeHandler(final ProtocolHandler handler) {
-        return handlers.remove(handler);
-    }
-
-    @Override
-    public void dispatchRequest(final Socket socket) {
-        byte[] receivedMessage = null;
-        String hostname = null;
-        final int maxMsgBuffer = 1024 * 1024;   // don't buffer more than 1 MB 
of the message
-        try {
-            final StopWatch stopWatch = new StopWatch(true);
-            hostname = socket.getInetAddress().getHostName();
-            final String requestId = UUID.randomUUID().toString();
-            logger.info("Received request {} from {}", requestId, hostname);
-            
-            String requestorDn = null;
-            if ( socket instanceof SSLSocket ) {
-                final SSLSocket sslSocket = (SSLSocket) socket;
-                try {
-                    final X509Certificate[] certChains = 
sslSocket.getSession().getPeerCertificateChain();
-                    if ( certChains != null && certChains.length > 0 ) {
-                        requestorDn = certChains[0].getSubjectDN().getName();
-                    }
-                } catch (final ProtocolException pe) {
-                    throw pe;
-                } catch (final Exception e) {
-                    throw new ProtocolException(e);
-                }
-            }
-            
-            // unmarshall message
-            final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = 
protocolContext.createUnmarshaller();
-            final InputStream inStream = socket.getInputStream();
-            final CopyingInputStream copyingInputStream = new 
CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB
-            logger.debug("Request {} has a message length of {}", requestId, 
copyingInputStream.getNumberOfBytesCopied());
-            
-            final ProtocolMessage request;
-            try {
-                request = unmarshaller.unmarshal(copyingInputStream);
-            } finally {
-                receivedMessage = copyingInputStream.getBytesRead();
-            }
-            
-            request.setRequestorDN(requestorDn);
-            
-            // dispatch message to handler
-            ProtocolHandler desiredHandler = null;
-            for (final ProtocolHandler handler : getHandlers()) {
-                if (handler.canHandle(request)) {
-                    desiredHandler = handler;
-                    break;
-                }
-            }
-
-            // if no handler found, throw exception; otherwise handle request
-            if (desiredHandler == null) {
-                throw new ProtocolException("No handler assigned to handle 
message type: " + request.getType());
-            } else {
-                final ProtocolMessage response = 
desiredHandler.handle(request);
-                if(response != null) {
-                    try {
-                        logger.debug("Sending response for request {}", 
requestId);
-                            
-                        // marshal message to output stream
-                        final ProtocolMessageMarshaller<ProtocolMessage> 
marshaller = protocolContext.createMarshaller();
-                        marshaller.marshal(response, socket.getOutputStream());
-                    } catch (final IOException ioe) {
-                        throw new ProtocolException("Failed marshalling 
protocol message in response to message type: " + request.getType() + " due to 
" + ioe, ioe);
-                    }
-                }
-            }
-            
-            stopWatch.stop();
-            logger.info("Finished processing request {} (type={}, length={} 
bytes) in {} millis", requestId, request.getType(), receivedMessage.length, 
stopWatch.getDuration(TimeUnit.MILLISECONDS));
-        } catch (final IOException e) {
-            logger.warn("Failed processing protocol message from " + hostname 
+ " due to " + e, e);
-            
-            if ( bulletinRepository != null ) {
-                final Bulletin bulletin = 
BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed 
to process protocol message from %s due to: %s", hostname, e.toString()));
-                bulletinRepository.addBulletin(bulletin);
-            }
-        } catch (final ProtocolException e) {
-            logger.warn("Failed processing protocol message from " + hostname 
+ " due to " + e, e);
-            if ( bulletinRepository != null ) {
-                final Bulletin bulletin = 
BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed 
to process protocol message from %s due to: %s", hostname, e.toString()));
-                bulletinRepository.addBulletin(bulletin);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
deleted file mode 100644
index bc68630..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-
-/**
- * Implements a context for communicating internally amongst the cluster using
- * JAXB.
- * 
- * @param <T> The type of protocol message.
- *
- * @author unattributed
- */
-public class JaxbProtocolContext<T> implements ProtocolContext {
-
-    private static final int BUF_SIZE = (int) Math.pow(2, 10);  // 1k
-    
-    /*
-     * A sentinel is used to detect corrupted messages.  Relying on the 
integrity
-     * of the message size can cause memory issues if the value is corrupted 
-     * and equal to a number larger than the memory size.
-     */
-    private static final byte MESSAGE_PROTOCOL_START_SENTINEL = 0x5A;
-    
-    private final JAXBContext jaxbCtx;
-    
-    public JaxbProtocolContext(final JAXBContext jaxbCtx) {
-        this.jaxbCtx = jaxbCtx;
-    }
-    
-    @Override
-    public ProtocolMessageMarshaller<T> createMarshaller() {
-        return new ProtocolMessageMarshaller<T>() {
-
-            @Override
-            public void marshal(final T msg, final OutputStream os) throws 
IOException {
-
-                try {
-
-                    // marshal message to output stream
-                    final Marshaller marshaller = jaxbCtx.createMarshaller();
-                    final ByteArrayOutputStream msgBytes = new 
ByteArrayOutputStream();
-                    marshaller.marshal(msg, msgBytes);
-
-                    final DataOutputStream dos = new DataOutputStream(os);
-
-                    // write message protocol sentinel
-                    dos.write(MESSAGE_PROTOCOL_START_SENTINEL);
-                    
-                    // write message size in bytes
-                    dos.writeInt(msgBytes.size());
-
-                    // write message
-                    dos.write(msgBytes.toByteArray());
-
-                    dos.flush();
-
-                } catch (final JAXBException je) {
-                    throw new IOException("Failed marshalling protocol message 
due to: " + je, je);
-                }
-
-            }
-        };
-    }
-
-    @Override
-    public ProtocolMessageUnmarshaller<T> createUnmarshaller() {
-        return new ProtocolMessageUnmarshaller<T>() {
-
-            @Override
-            public T unmarshal(final InputStream is) throws IOException {
-
-                try {
-
-                    final DataInputStream dis = new DataInputStream(is);
-
-                    // check for the presence of the message protocol sentinel
-                    final byte sentinel = (byte) dis.read();
-                    if ( sentinel == -1 ) {
-                        throw new EOFException();
-                    }
-
-                    if(MESSAGE_PROTOCOL_START_SENTINEL != sentinel) {
-                        throw new IOException("Failed reading protocol message 
due to malformed header");
-                    }
-                    
-                    // read the message size
-                    final int msgBytesSize = dis.readInt();
-
-                    // read the message
-                    final ByteBuffer buffer = 
ByteBuffer.allocate(msgBytesSize);
-                    int totalBytesRead = 0;
-                    do {
-                        final int bytesToRead;
-                        if ((msgBytesSize - totalBytesRead) >= BUF_SIZE) {
-                            bytesToRead = BUF_SIZE;
-                        } else {
-                            bytesToRead = msgBytesSize - totalBytesRead;
-                        }
-                        totalBytesRead += dis.read(buffer.array(), 
totalBytesRead, bytesToRead);
-                    } while (totalBytesRead < msgBytesSize);
-
-                    // unmarshall message and return
-                    final Unmarshaller unmarshaller = 
jaxbCtx.createUnmarshaller();
-                    final byte[] msg = new byte[totalBytesRead];
-                    buffer.get(msg);
-                    return (T) unmarshaller.unmarshal(new 
ByteArrayInputStream(msg));
-
-                } catch (final JAXBException je) {
-                    throw new IOException("Failed unmarshalling protocol 
message due to: " + je, je);
-                }
-
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
deleted file mode 100644
index d9de24f..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-
-/**
- * @author unattributed
- */
-public class AdaptedConnectionRequest {
-    
-    private NodeIdentifier nodeIdentifier;
-    
-    public AdaptedConnectionRequest() {}
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeIdentifier() {
-        return nodeIdentifier;
-    }
-
-    public void setNodeIdentifier(final NodeIdentifier nodeIdentifier) {
-        this.nodeIdentifier = nodeIdentifier;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
deleted file mode 100644
index c7c783b..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionResponse.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-
-/**
- * @author unattributed
- */
-public class AdaptedConnectionResponse {
-    
-    private StandardDataFlow dataFlow;
-    private NodeIdentifier nodeIdentifier;
-    private boolean blockedByFirewall;
-    private boolean primary;
-    private int tryLaterSeconds;
-    private Integer managerRemoteInputPort;
-    private Boolean managerRemoteCommsSecure;
-    private String instanceId;
-    
-    public AdaptedConnectionResponse() {}
-
-    @XmlJavaTypeAdapter(DataFlowAdapter.class)
-    public StandardDataFlow getDataFlow() {
-        return dataFlow;
-    }
-
-    public void setDataFlow(StandardDataFlow dataFlow) {
-        this.dataFlow = dataFlow;
-    }
-
-    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
-    public NodeIdentifier getNodeIdentifier() {
-        return nodeIdentifier;
-    }
-
-    public void setNodeIdentifier(NodeIdentifier nodeIdentifier) {
-        this.nodeIdentifier = nodeIdentifier;
-    }
-
-    public int getTryLaterSeconds() {
-        return tryLaterSeconds;
-    }
-
-    public void setTryLaterSeconds(int tryLaterSeconds) {
-        this.tryLaterSeconds = tryLaterSeconds;
-    }
-
-    public boolean isBlockedByFirewall() {
-        return blockedByFirewall;
-    }
-
-    public void setBlockedByFirewall(boolean blockedByFirewall) {
-        this.blockedByFirewall = blockedByFirewall;
-    }
-
-    public boolean isPrimary() {
-        return primary;
-    }
-
-    public void setPrimary(boolean primary) {
-        this.primary = primary;
-    }
-
-    public boolean shouldTryLater() {
-        return tryLaterSeconds > 0;
-    }
-    
-    public void setManagerRemoteInputPort(Integer managerRemoteInputPort) {
-        this.managerRemoteInputPort = managerRemoteInputPort;
-    }
-    
-    public Integer getManagerRemoteInputPort() {
-        return managerRemoteInputPort;
-    }
-    
-    public void setManagerRemoteCommsSecure(Boolean secure) {
-        this.managerRemoteCommsSecure = secure;
-    }
-    
-    public Boolean isManagerRemoteCommsSecure() {
-        return managerRemoteCommsSecure;
-    }
-    
-    public void setInstanceId(String instanceId) {
-        this.instanceId = instanceId;
-    }
-    
-    public String getInstanceId() {
-        return instanceId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
deleted file mode 100644
index 89d903b..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedCounter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-/**
- * @author unattributed
- */
-public class AdaptedCounter {
-    
-    private String groupName;
-    
-    private String name;
-    
-    private long value;
-
-    public AdaptedCounter() {}
-
-    public String getGroupName() {
-        return groupName;
-    }
-
-    public void setGroupName(String counterGroupName) {
-        this.groupName = counterGroupName;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String counterName) {
-        this.name = counterName;
-    }
-
-    public long getValue() {
-        return value;
-    }
-
-    public void setValue(long value) {
-        this.value = value;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/761e64a4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
deleted file mode 100644
index bb97619..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.jaxb.message;
-
-/**
- * @author unattributed
- */
-public class AdaptedDataFlow {
-    
-    private byte[] flow;
-    private byte[] templates;
-    private byte[] snippets;
-    
-    private boolean autoStartProcessors;
-    
-    public AdaptedDataFlow() {}
-
-    public byte[] getFlow() {
-        return flow;
-    }
-
-    public void setFlow(byte[] flow) {
-        this.flow = flow;
-    }
-
-    public byte[] getTemplates() {
-        return templates;
-    }
-
-    public void setTemplates(byte[] templates) {
-        this.templates = templates;
-    }
-
-    public byte[] getSnippets() {
-        return snippets;
-    }
-
-    public void setSnippets(byte[] snippets) {
-        this.snippets = snippets;
-    }
-
-    public boolean isAutoStartProcessors() {
-        return autoStartProcessors;
-    }
-
-    public void setAutoStartProcessors(boolean runningAllProcessors) {
-        this.autoStartProcessors = runningAllProcessors;
-    }
-
-}

Reply via email to