http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index ec169ad..cb2d76d 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -35,8 +35,8 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLContext;
 
-import org.apache.nifi.cluster.NodeInformant;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index b0d88d4..53f998e 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -44,18 +45,20 @@ import javax.net.ssl.SSLContext;
 import javax.security.cert.CertificateExpiredException;
 import javax.security.cert.CertificateNotYetValidException;
 
-import org.apache.nifi.cluster.ClusterNodeInformation;
-import org.apache.nifi.cluster.NodeInformation;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.client.socket.EndpointConnectionState;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;
@@ -71,35 +74,28 @@ import 
org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-
+import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.sun.jersey.api.client.ClientHandlerException;
 
-public class StandardRemoteGroupPort extends AbstractPort implements 
RemoteGroupPort {
+public class StandardRemoteGroupPort extends RemoteGroupPort {
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
     
     public static final int GZIP_COMPRESSION_LEVEL = 1;
-    public static final long PEER_REFRESH_PERIOD = 60000L;
     
     private static final String CATEGORY = "Site to Site";
     
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRemoteGroupPort.class);
     private final RemoteProcessGroup remoteGroup;
-    private final SSLContext sslContext;
     private final AtomicBoolean useCompression = new AtomicBoolean(false);
     private final AtomicBoolean targetExists = new AtomicBoolean(true);
     private final AtomicBoolean targetRunning = new AtomicBoolean(true);
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile long peerRefreshTime = 0L;
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
+    private final TransferDirection transferDirection;
     
-    private final ConcurrentMap<String, 
BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new 
ConcurrentHashMap<>();
+    private final EndpointConnectionStatePool connectionStatePool;
     
     private final Set<CommunicationsSession> activeCommsChannels = new 
HashSet<>();
     private final Lock interruptLock = new ReentrantLock();
@@ -113,8 +109,17 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
         super(id, name, processGroup, type, scheduler);
         
         this.remoteGroup = remoteGroup;
-        this.sslContext = sslContext;
+        this.transferDirection = direction;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
+        
+        final File stateDir = 
NiFiProperties.getInstance().getPersistentStateDirectory();
+        final File persistenceFile = new File(stateDir, 
remoteGroup.getIdentifier() + ".peers");
+        
+        // TODO: This should really be constructed in the RemoteProcessGroup 
and made available to all ports in
+        // that remote process group. This prevents too many connections from 
being made and also protects the persistenceFile
+        // so that only a single thread will ever attempt to write to the file 
at once.
+        FIXME();
+        connectionStatePool = new EndpointConnectionStatePool(sslContext, 
remoteGroup.getEventReporter(), persistenceFile);
     }
     
     @Override
@@ -133,28 +138,15 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
     
     @Override
     public void shutdown() {
-        super.shutdown();
-        
-        peerTimeoutExpirations.clear();
+       super.shutdown();
         interruptLock.lock();
         try {
             this.shutdown = true;
-            
-            for ( final CommunicationsSession commsSession : 
activeCommsChannels ) {
-                commsSession.interrupt();
-            }
-            
-            for ( final BlockingQueue<EndpointConnectionState> queue : 
endpointConnectionMap.values() ) {
-                EndpointConnectionState state;
-                while ( (state = queue.poll()) != null)  {
-                    cleanup(state.getSocketClientProtocol(), state.getPeer());
-                }
-            }
-            
-            endpointConnectionMap.clear();
         } finally {
             interruptLock.unlock();
         }
+
+       connectionStatePool.shutdown();
     }
     
     @Override
@@ -171,31 +163,7 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
     
     
     void cleanupSockets() {
-        final List<EndpointConnectionState> states = new ArrayList<>();
-        
-        for ( final BlockingQueue<EndpointConnectionState> queue : 
endpointConnectionMap.values() ) {
-            states.clear();
-            
-            EndpointConnectionState state;
-            while ((state = queue.poll()) != null) {
-                // If the socket has not been used in 10 seconds, shut it down.
-                final long lastUsed = state.getLastTimeUsed();
-                if ( lastUsed < System.currentTimeMillis() - 10000L ) {
-                    try {
-                        
state.getSocketClientProtocol().shutdown(state.getPeer());
-                    } catch (final Exception e) {
-                        logger.debug("Failed to shut down {} using {} due to 
{}", 
-                            new Object[] {state.getSocketClientProtocol(), 
state.getPeer(), e} );
-                    }
-                    
-                    cleanup(state.getSocketClientProtocol(), state.getPeer());
-                } else {
-                    states.add(state);
-                }
-            }
-            
-            queue.addAll(states);
-        }
+        connectionStatePool.cleanupExpiredSockets();
     }
     
     
@@ -212,137 +180,46 @@ public class StandardRemoteGroupPort extends 
AbstractPort implements RemoteGroup
         }
         
         String url = getRemoteProcessGroup().getTargetUri().toString();
-        Peer peer = null;
-        final PeerStatus peerStatus = getNextPeerStatus();
-        if ( peerStatus == null ) {
-            logger.debug("{} Unable to determine the next peer to communicate 
with; all peers must be penalized, so yielding context", this);
+        
+        final EndpointConnectionState connectionState;
+        try {
+               connectionState = 
connectionStatePool.getEndpointConnectionState(url, this, transferDirection);
+        } catch (final PortNotRunningException e) {
             context.yield();
+            this.targetRunning.set(false);
+            final String message = String.format("%s failed to communicate 
with %s because the remote instance indicates that the port is not in a valid 
state", this, url);
+            logger.error(message);
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
             return;
-        }
-        
-        url = "nifi://" + peerStatus.getHostname() + ":" + 
peerStatus.getPort();
-        
-        //
-        // Attempt to get a connection state that already exists for this URL.
-        //
-        BlockingQueue<EndpointConnectionState> connectionStateQueue = 
endpointConnectionMap.get(url);
-        if ( connectionStateQueue == null ) {
-            connectionStateQueue = new LinkedBlockingQueue<>();
-            BlockingQueue<EndpointConnectionState> existingQueue = 
endpointConnectionMap.putIfAbsent(url, connectionStateQueue);
-            if ( existingQueue != null ) {
-                connectionStateQueue = existingQueue;
+        } catch (final UnknownPortException e) {
+            context.yield();
+            this.targetExists.set(false);
+            final String message = String.format("%s failed to communicate 
with %s because the remote instance indicates that the port no longer exists", 
this, url);
+            logger.error(message);
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
+            return;
+        } catch (final ProtocolException | HandshakeException | IOException e) 
{
+            final String message = String.format("%s failed to communicate 
with %s due to %s", this, url, e.toString());
+            logger.error(message);
+            if ( logger.isDebugEnabled() ) {
+                logger.error("", e);
             }
+            remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
+            session.rollback();
+            return;
         }
         
-        FlowFileCodec codec = null;
-        CommunicationsSession commsSession = null;
-        SocketClientProtocol protocol = null;
-        EndpointConnectionState connectionState;
+        if ( connectionState == null ) {
+            logger.debug("{} Unable to determine the next peer to communicate 
with; all peers must be penalized, so yielding context", this);
+            context.yield();
+            return;
+        }
         
-        do {
-            connectionState = connectionStateQueue.poll();
-            logger.debug("{} Connection State for {} = {}", this, url, 
connectionState);
-            
-            // if we can't get an existing ConnectionState, create one
-            if ( connectionState == null ) {
-                protocol = new SocketClientProtocol();
-                protocol.setPort(this);
-    
-                try {
-                    commsSession = establishSiteToSiteConnection(peerStatus);
-                    final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-                    final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-                    try {
-                        
RemoteResourceFactory.initiateResourceNegotiation(protocol, dis, dos);
-                    } catch (final HandshakeException e) {
-                        try {
-                            commsSession.close();
-                        } catch (final IOException ioe) {
-                            final String message = String.format("%s unable to 
close communications session %s due to %s; resources may not be appropriately 
cleaned up",
-                                this, commsSession, ioe.toString());
-                            logger.error(message);
-                            
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                        }
-                    }
-                } catch (final IOException e) {
-                    final String message = String.format("%s failed to 
communicate with %s due to %s", this, peer == null ? url : peer, e.toString());
-                    logger.error(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.error("", e);
-                    }
-                    remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
-                    session.rollback();
-                    return;
-                }
-                
-                
-                peer = new Peer(commsSession, url);
-                
-                // perform handshake
-                try {
-                    protocol.handshake(peer);
-                    
-                    // handle error cases
-                    if ( protocol.isDestinationFull() ) {
-                        logger.warn("{} {} indicates that port's destination 
is full; penalizing peer", this, peer);
-                        penalize(peer);
-                        cleanup(protocol, peer);
-                        return;
-                    } else if ( protocol.isPortInvalid() ) {
-                        penalize(peer);
-                        context.yield();
-                        cleanup(protocol, peer);
-                        this.targetRunning.set(false);
-                        final String message = String.format("%s failed to 
communicate with %s because the remote instance indicates that the port is not 
in a valid state", this, peer);
-                        logger.error(message);
-                        
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                        return;
-                    } else if ( protocol.isPortUnknown() ) {
-                        penalize(peer);
-                        context.yield();
-                        cleanup(protocol, peer);
-                        this.targetExists.set(false);
-                        final String message = String.format("%s failed to 
communicate with %s because the remote instance indicates that the port no 
longer exists", this, peer);
-                        logger.error(message);
-                        
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
-                        return;
-                    }
-                    
-                    // negotiate the FlowFileCodec to use
-                    codec = protocol.negotiateCodec(peer);
-                } catch (final Exception e) {
-                    penalize(peer);
-                    cleanup(protocol, peer);
-                    
-                    final String message = String.format("%s failed to 
communicate with %s due to %s", this, peer == null ? url : peer, e.toString());
-                    logger.error(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.error("", e);
-                    }
-                    remoteGroup.getEventReporter().reportEvent(Severity.ERROR, 
CATEGORY, message);
-                    session.rollback();
-                    return;                    
-                }
-                
-                connectionState = new EndpointConnectionState(peer, protocol, 
codec);
-            } else {
-                final long lastTimeUsed = connectionState.getLastTimeUsed();
-                final long millisSinceLastUse = System.currentTimeMillis() - 
lastTimeUsed;
-                final long timeoutMillis = 
remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS);
-                
-                if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis 
) {
-                    cleanup(connectionState.getSocketClientProtocol(), 
connectionState.getPeer());
-                    connectionState = null;
-                } else {
-                    codec = connectionState.getCodec();
-                    peer = connectionState.getPeer();
-                    commsSession = peer.getCommunicationsSession();
-                    protocol = connectionState.getSocketClientProtocol();
-                }
-            }
-        } while ( connectionState == null || codec == null || commsSession == 
null || protocol == null );
+        FlowFileCodec codec = connectionState.getCodec();
+        SocketClientProtocol protocol = 
connectionState.getSocketClientProtocol();
+        final Peer peer = connectionState.getPeer();
+        url = peer.getUrl();
         
-            
         try {
             interruptLock.lock();
             try {
@@ -361,11 +238,6 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
                 receiveFlowFiles(peer, protocol, context, session, codec);
             }
 
-            if ( peer.isPenalized() ) {
-                logger.debug("{} {} was penalized", this, peer);
-                penalize(peer);
-            }
-            
             interruptLock.lock();
             try {
                 if ( shutdown ) {
@@ -380,12 +252,12 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
             session.commit();
             
             connectionState.setLastTimeUsed();
-            connectionStateQueue.add(connectionState);
+            connectionStatePool.offer(connectionState);
         } catch (final TransmissionDisabledException e) {
             cleanup(protocol, peer);
             session.rollback();
         } catch (final Exception e) {
-            penalize(peer);
+            connectionStatePool.penalize(peer, 
getYieldPeriod(TimeUnit.MILLISECONDS));
 
             final String message = String.format("%s failed to communicate 
with %s (%s) due to %s", this, peer == null ? url : peer, protocol, 
e.toString());
             logger.error(message);
@@ -401,34 +273,6 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
     }
 
     
-    /**
-     * Updates internal state map to penalize a PeerStatus that points to the 
specified peer
-     * @param peer
-     */
-    private void penalize(final Peer peer) {
-        String host;
-        int port;
-        try {
-            final URI uri = new URI(peer.getUrl());
-            host = uri.getHost();
-            port = uri.getPort();
-        } catch (final URISyntaxException e) {
-            host = peer.getHost();
-            port = -1;
-        }
-        
-        final PeerStatus status = new PeerStatus(host, port, true, 1);
-        Long expiration = peerTimeoutExpirations.get(status);
-        if ( expiration == null ) {
-            expiration = Long.valueOf(0L);
-        }
-        
-        final long penalizationMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
-        final long newExpiration = Math.max(expiration, 
System.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
-    }
-    
-    
     private void cleanup(final SocketClientProtocol protocol, final Peer peer) 
{
         if ( protocol != null && peer != null ) {
             try {
@@ -457,108 +301,6 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
         return remoteGroup.getYieldDuration();
     }
     
-    public CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
-        final String destinationUri = "nifi://" + peerStatus.getHostname() + 
":" + peerStatus.getPort();
-
-        CommunicationsSession commsSession = null;
-        try {
-        if ( peerStatus.isSecure() ) {
-            if ( sslContext == null ) {
-                throw new IOException("Unable to communicate with " + 
peerStatus.getHostname() + ":" + peerStatus.getPort() + " because it requires 
Secure Site-to-Site communications, but this instance is not configured for 
secure communications");
-            }
-            
-            final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, peerStatus.getHostname(), peerStatus.getPort(), 
true);
-                socketChannel.connect();
-    
-            commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-                
-                try {
-                    commsSession.setUserDn(socketChannel.getDn());
-                } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
-                    throw new IOException(ex);
-                }
-        } else {
-            final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(peerStatus.getHostname(), peerStatus.getPort()));
-            commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
-        }
-
-        
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
-        commsSession.setUri(destinationUri);
-        } catch (final IOException ioe) {
-            if ( commsSession != null ) {
-                commsSession.close();
-            }
-            
-            throw ioe;
-        }
-        
-        return commsSession;
-    }
-    
-    private PeerStatus getNextPeerStatus() {
-        List<PeerStatus> peerList = peerStatuses;
-        if ( (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && 
peerRefreshLock.tryLock() ) {
-            try {
-                try {
-                    peerList = createPeerStatusList();
-                } catch (final IOException | BadRequestException | 
HandshakeException | UnknownPortException | PortNotRunningException | 
ClientHandlerException e) {
-                    final String message = String.format("%s Failed to update 
list of peers due to %s", this, e.toString());
-                    logger.warn(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", e);
-                    }
-                    
remoteGroup.getEventReporter().reportEvent(Severity.WARNING, CATEGORY, message);
-                }
-                
-                this.peerStatuses = peerList;
-                peerRefreshTime = System.currentTimeMillis();
-            } finally {
-                peerRefreshLock.unlock();
-            }
-        }
-
-        if ( peerList == null || peerList.isEmpty() ) {
-            return null;
-        }
-
-        PeerStatus peerStatus;
-        for (int i=0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
-            
-            if ( isPenalized(peerStatus) ) {
-                logger.debug("{} {} is penalized; will not communicate with 
this peer", this, peerStatus);
-            } else {
-                return peerStatus;
-            }
-        }
-        
-        logger.debug("{} All peers appear to be penalized; returning null", 
this);
-        return null;
-    }
-    
-    private boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
-        return (expirationEnd == null ? false : expirationEnd > 
System.currentTimeMillis() );
-    }
-    
-    private List<PeerStatus> createPeerStatusList() throws IOException, 
BadRequestException, HandshakeException, UnknownPortException, 
PortNotRunningException {
-        final Set<PeerStatus> statuses = remoteGroup.getPeerStatuses();
-        if ( statuses == null ) {
-            return new ArrayList<>();
-        }
-        
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> nodeInfos = new ArrayList<>();
-        for ( final PeerStatus peerStatus : statuses ) {
-            final NodeInformation nodeInfo = new 
NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, 
peerStatus.isSecure(), peerStatus.getFlowFileCount());
-            nodeInfos.add(nodeInfo);
-        }
-        clusterNodeInfo.setNodeInformation(nodeInfos);
-        return formulateDestinationList(clusterNodeInfo);
-    }
     
     private void transferFlowFiles(final Peer peer, final ClientProtocol 
protocol, final ProcessContext context, final ProcessSession session, final 
FlowFileCodec codec) throws IOException, ProtocolException {
         protocol.transferFlowFiles(peer, context, session, codec);
@@ -568,70 +310,6 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
         protocol.receiveFlowFiles(peer, context, session, codec);
     }
 
-    private List<PeerStatus> formulateDestinationList(final 
ClusterNodeInformation clusterNodeInfo) throws IOException {
-        return formulateDestinationList(clusterNodeInfo, getConnectableType());
-    }
-    
-    static List<PeerStatus> formulateDestinationList(final 
ClusterNodeInformation clusterNodeInfo, final ConnectableType connectableType) {
-        final Collection<NodeInformation> nodeInfoSet = 
clusterNodeInfo.getNodeInformation();
-        final int numDestinations = Math.max(128, nodeInfoSet.size());
-        final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
-
-        long totalFlowFileCount = 0L;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            totalFlowFileCount += nodeInfo.getTotalFlowFiles();
-        }
-
-        int totalEntries = 0;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            final int flowFileCount = nodeInfo.getTotalFlowFiles();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) 
flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (connectableType == 
ConnectableType.REMOTE_INPUT_PORT) ? (1 - percentageOfFlowFiles) : 
percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * 
relativeWeighting));
-            
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
-        }
-        
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i=0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for ( final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet() ) {
-            final NodeInformation nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-            
-            int skipIndex = numEntries;
-            for (int i=0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if ( status == null ) {
-                        status = new PeerStatus(nodeInfo.getHostname(), 
nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), 
nodeInfo.getTotalFlowFiles());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
-            }
-        }
-
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for ( final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet() ) {
-            final double percentage = entry.getValue() * 100D / (double) 
destinations.size();
-            
distributionDescription.append("\n").append(entry.getKey()).append(" will 
receive ").append(percentage).append("% of FlowFiles");
-        }
-        logger.info(distributionDescription.toString());
-
-        // Jumble the list of destinations.
-        return destinations;
-    }
-    
-    
     @Override
     public boolean getTargetExists() {
         return targetExists.get();
@@ -717,40 +395,6 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
     }
     
     
-    private static class EndpointConnectionState {
-        private final Peer peer;
-        private final SocketClientProtocol socketClientProtocol;
-        private final FlowFileCodec codec;
-        private volatile long lastUsed;
-        
-        private EndpointConnectionState(final Peer peer, final 
SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) {
-            this.peer = peer;
-            this.socketClientProtocol = socketClientProtocol;
-            this.codec = codec;
-        }
-        
-        public FlowFileCodec getCodec() {
-            return codec;
-        }
-        
-        public SocketClientProtocol getSocketClientProtocol() {
-            return socketClientProtocol;
-        }
-        
-        public Peer getPeer() {
-            return peer;
-        }
-        
-        public void setLastTimeUsed() {
-            lastUsed = System.currentTimeMillis();
-        }
-        
-        public long getLastTimeUsed() {
-            return lastUsed;
-        }
-    }
-
-    
     @Override
     public SchedulingStrategy getSchedulingStrategy() {
         return SchedulingStrategy.TIMER_DRIVEN;
@@ -761,4 +405,28 @@ public class StandardRemoteGroupPort extends AbstractPort 
implements RemoteGroup
     public boolean isSideEffectFree() {
         return false;
     }
+
+       @Override
+       public String getDescription() {
+               return toString();
+       }
+
+       @Override
+       public long getCommunicationsTimeout(final TimeUnit timeUnit) {
+               return 
getRemoteProcessGroup().getCommunicationsTimeout(timeUnit);
+       }
+
+       @Override
+       public URI getTargetUri() {
+               return remoteGroup.getTargetUri();
+       }
+       
+       @Override
+       public boolean isSecure() {
+               try {
+                       return remoteGroup.isSecure();
+               } catch (final CommunicationsException ce) {
+                       return false;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
deleted file mode 100644
index d18a4ee..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.codec;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public class StandardFlowFileCodec implements FlowFileCodec {
-       public static final int MAX_NUM_ATTRIBUTES = 25000;
-
-    public static final String DEFAULT_FLOWFILE_PATH = "./";
-
-    private final VersionNegotiator versionNegotiator;
-
-    public StandardFlowFileCodec() {
-        versionNegotiator = new StandardVersionNegotiator(1);
-    }
-    
-    @Override
-    public FlowFile encode(final FlowFile flowFile, final ProcessSession 
session, final OutputStream encodedOut) throws IOException {
-        final DataOutputStream out = new DataOutputStream(encodedOut);
-        
-        final Map<String, String> attributes = flowFile.getAttributes();
-        out.writeInt(attributes.size());
-        for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
-            writeString(entry.getKey(), out);
-            writeString(entry.getValue(), out);
-        }
-        
-        out.writeLong(flowFile.getSize());
-        
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                final byte[] buffer = new byte[8192];
-                int len;
-                while ( (len = in.read(buffer)) > 0 ) {
-                    encodedOut.write(buffer, 0, len);
-                }
-                
-                encodedOut.flush();
-            }
-        });
-        
-        return flowFile;
-    }
-
-    
-    @Override
-    public FlowFile decode(final InputStream stream, final ProcessSession 
session) throws IOException, ProtocolException {
-        final DataInputStream in = new DataInputStream(stream);
-        
-        final int numAttributes;
-        try {
-            numAttributes = in.readInt();
-        } catch (final EOFException e) {
-            // we're out of data.
-            return null;
-        }
-        
-        // This is here because if the stream is not properly formed, we could 
get up to Integer.MAX_VALUE attributes, which will
-        // generally result in an OutOfMemoryError.
-        if ( numAttributes > MAX_NUM_ATTRIBUTES ) {
-               throw new ProtocolException("FlowFile exceeds maximum number of 
attributes with a total of " + numAttributes);
-        }
-        
-        try {
-            final Map<String, String> attributes = new 
HashMap<>(numAttributes);
-            for (int i=0; i < numAttributes; i++) {
-                final String attrName = readString(in);
-                final String attrValue = readString(in);
-                attributes.put(attrName, attrValue);
-            }
-            
-            final long numBytes = in.readLong();
-            
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    int len;
-                    long size = 0;
-                    final byte[] buffer = new byte[8192];
-                    
-                    while ( size < numBytes && (len = in.read(buffer, 0, (int) 
Math.min(buffer.length, numBytes - size))) > 0 ) {
-                        out.write(buffer, 0, len);
-                        size += len;
-                    }
-
-                    if ( size != numBytes ) {
-                        throw new EOFException("Expected " + numBytes + " 
bytes but received only " + size);
-                    }
-                }
-            });
-
-            return flowFile;
-        } catch (final EOFException e) {
-               session.rollback();
-               
-            // we throw the general IOException here because we did not expect 
to hit EOFException
-            throw e;
-        }
-    }
-
-    private void writeString(final String val, final DataOutputStream out) 
throws IOException {
-        final byte[] bytes = val.getBytes("UTF-8");
-        out.writeInt(bytes.length);
-        out.write(bytes);
-    }
-
-    
-    private String readString(final DataInputStream in) throws IOException {
-        final int numBytes = in.readInt();
-        final byte[] bytes = new byte[numBytes];
-        StreamUtils.fillBuffer(in, bytes, true);
-        return new String(bytes, "UTF-8");
-    }
-    
-    @Override
-    public List<Integer> getSupportedVersions() {
-        return versionNegotiator.getSupportedVersions();
-    }
-
-    @Override
-    public VersionNegotiator getVersionNegotiator() {
-        return versionNegotiator;
-    }
-
-    @Override
-    public String toString() {
-        return "Standard FlowFile Codec, Version " + 
versionNegotiator.getVersion();
-    }
-
-    @Override
-    public String getResourceName() {
-        return "StandardFlowFileCodec";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
deleted file mode 100644
index 926809c..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-import org.apache.nifi.remote.codec.FlowFileCodec;
-
-public class UnsupportedCodecException extends RuntimeException {
-       private static final long serialVersionUID = 198234789237L;
-
-       public UnsupportedCodecException(final String codecName) {
-        super("Codec " + codecName + " is not supported");
-    }
-
-    public UnsupportedCodecException(final FlowFileCodec codec, final int 
version) {
-        super("Codec " + codec.getClass().getName() + " does not support 
Version " + version);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
deleted file mode 100644
index 0822b6a..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
-    private final SocketChannel channel;
-    private final SocketChannelInput request;
-    private final SocketChannelOutput response;
-    private int timeout = 30000;
-    
-    public SocketChannelCommunicationsSession(final SocketChannel 
socketChannel, final String uri) throws IOException {
-        super(uri);
-        request = new SocketChannelInput(socketChannel);
-        response = new SocketChannelOutput(socketChannel);
-        channel = socketChannel;
-        socketChannel.configureBlocking(false);
-    }
-    
-    @Override
-    public boolean isClosed() {
-        return !channel.isConnected();
-    }
-    
-    @Override
-    public SocketChannelInput getInput() {
-        return request;
-    }
-
-    @Override
-    public SocketChannelOutput getOutput() {
-        return response;
-    }
-
-    @Override
-    public void setTimeout(final int millis) throws IOException {
-        request.setTimeout(millis);
-        response.setTimeout(millis);
-        this.timeout = millis;
-    }
-
-    @Override
-    public int getTimeout() throws IOException {
-        return timeout;
-    }
-
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-    
-    @Override
-    public boolean isDataAvailable() {
-        return request.isDataAvailable();
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return response.getBytesWritten();
-    }
-
-    @Override
-    public long getBytesRead() {
-        return request.getBytesRead();
-    }
-    
-    @Override
-    public void interrupt() {
-        request.interrupt();
-        response.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
deleted file mode 100644
index 9e451fd..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.SocketChannel;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.remote.io.InterruptableInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-
-public class SocketChannelInput implements CommunicationsInput {
-    private final SocketChannelInputStream socketIn;
-    private final ByteCountingInputStream countingIn;
-    private final InputStream bufferedIn;
-    private final InterruptableInputStream interruptableIn;
-    
-    public SocketChannelInput(final SocketChannel socketChannel) throws 
IOException {
-        this.socketIn = new SocketChannelInputStream(socketChannel);
-        countingIn = new ByteCountingInputStream(socketIn);
-        bufferedIn = new BufferedInputStream(countingIn);
-        interruptableIn = new InterruptableInputStream(bufferedIn);
-    }
-    
-    @Override
-    public InputStream getInputStream() throws IOException {
-        return interruptableIn;
-    }
-
-    public void setTimeout(final int millis) {
-        socketIn.setTimeout(millis);
-    }
-    
-    public boolean isDataAvailable() {
-        try {
-            return interruptableIn.available() > 0;
-        } catch (final Exception e) {
-            return false;
-        }
-    }
-    
-    @Override
-    public long getBytesRead() {
-        return countingIn.getBytesRead();
-    }
-    
-    public void interrupt() {
-        interruptableIn.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
deleted file mode 100644
index 26c0164..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.SocketChannel;
-
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.remote.io.InterruptableOutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-
-public class SocketChannelOutput implements CommunicationsOutput {
-    private final SocketChannelOutputStream socketOutStream;
-    private final ByteCountingOutputStream countingOut;
-    private final OutputStream bufferedOut;
-    private final InterruptableOutputStream interruptableOut;
-    
-    public SocketChannelOutput(final SocketChannel socketChannel) throws 
IOException {
-        socketOutStream = new SocketChannelOutputStream(socketChannel);
-        countingOut = new ByteCountingOutputStream(socketOutStream);
-        bufferedOut = new BufferedOutputStream(countingOut);
-        interruptableOut = new InterruptableOutputStream(bufferedOut);
-    }
-    
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        return interruptableOut;
-    }
-    
-    public void setTimeout(final int timeout) {
-        socketOutStream.setTimeout(timeout);
-    }
-    
-    @Override
-    public long getBytesWritten() {
-        return countingOut.getBytesWritten();
-    }
-    
-    public void interrupt() {
-        interruptableOut.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
deleted file mode 100644
index dca1d84..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SSLSocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
-    private final SSLSocketChannel channel;
-    private final SSLSocketChannelInput request;
-    private final SSLSocketChannelOutput response;
-    
-    public SSLSocketChannelCommunicationsSession(final SSLSocketChannel 
channel, final String uri) {
-        super(uri);
-        request = new SSLSocketChannelInput(channel);
-        response = new SSLSocketChannelOutput(channel);
-        this.channel = channel;
-    }
-    
-    @Override
-    public SSLSocketChannelInput getInput() {
-        return request;
-    }
-
-    @Override
-    public SSLSocketChannelOutput getOutput() {
-        return response;
-    }
-
-    @Override
-    public void setTimeout(final int millis) throws IOException {
-        channel.setTimeout(millis);
-    }
-
-    @Override
-    public int getTimeout() throws IOException {
-        return channel.getTimeout();
-    }
-
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-    
-    @Override
-    public boolean isClosed() {
-        return channel.isClosed();
-    }
-    
-    @Override
-    public boolean isDataAvailable() {
-        try {
-            return request.isDataAvailable();
-        } catch (final Exception e) {
-            return false;
-        }
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return response.getBytesWritten();
-    }
-
-    @Override
-    public long getBytesRead() {
-        return request.getBytesRead();
-    }
-
-    @Override
-    public void interrupt() {
-        channel.interrupt();
-    }
-    
-    @Override
-    public String toString() {
-        return super.toString() + "[SSLSocketChannel=" + channel + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
deleted file mode 100644
index 60ef33f..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-
-public class SSLSocketChannelInput implements CommunicationsInput {
-    private final SSLSocketChannelInputStream in;
-    private final ByteCountingInputStream countingIn;
-    private final InputStream bufferedIn;
-    
-    public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
-        in = new SSLSocketChannelInputStream(socketChannel);
-        countingIn = new ByteCountingInputStream(in);
-        this.bufferedIn = new BufferedInputStream(countingIn);
-    }
-    
-    @Override
-    public InputStream getInputStream() throws IOException {
-        return bufferedIn;
-    }
-    
-    public boolean isDataAvailable() throws IOException {
-        return bufferedIn.available() > 0;
-    }
-    
-    @Override
-    public long getBytesRead() {
-        return countingIn.getBytesRead();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
deleted file mode 100644
index dc3d68f..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-
-public class SSLSocketChannelOutput implements CommunicationsOutput {
-    private final OutputStream out;
-    private final ByteCountingOutputStream countingOut;
-    
-    public SSLSocketChannelOutput(final SSLSocketChannel channel) {
-        countingOut = new ByteCountingOutputStream(new 
SSLSocketChannelOutputStream(channel));
-        out = new BufferedOutputStream(countingOut);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        return out;
-    }
-    
-    @Override
-    public long getBytesWritten() {
-        return countingOut.getBytesWritten();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
index a526f4c..391d52b 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
@@ -21,9 +21,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collection;
 
-import org.apache.nifi.cluster.ClusterNodeInformation;
-import org.apache.nifi.cluster.NodeInformant;
-import org.apache.nifi.cluster.NodeInformation;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -31,12 +28,14 @@ import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
deleted file mode 100644
index c4519cd..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-public enum HandshakeProperty {
-    GZIP,
-    PORT_IDENTIFIER,
-    REQUEST_EXPIRATION_MILLIS;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
deleted file mode 100644
index eae1940..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public class Response {
-    private final ResponseCode code;
-    private final String message;
-    
-    private Response(final ResponseCode code, final String explanation) {
-        this.code = code;
-        this.message = explanation;
-    }
-    
-    public ResponseCode getCode() {
-        return code;
-    }
-    
-    public String getMessage() {
-        return message;
-    }
-    
-    public static Response read(final DataInputStream in) throws IOException, 
ProtocolException {
-        final ResponseCode code = ResponseCode.readCode(in);
-        final String message = code.containsMessage() ? in.readUTF() : null;
-        return new Response(code, message);
-    }
-    
-    @Override
-    public String toString() {
-        return code + ": " + message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
deleted file mode 100644
index 0e588cd..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.remote.exception.ProtocolException;
-
-
-public enum ResponseCode {
-    RESERVED(0, "Reserved for Future Use", false), // This will likely be used 
if we ever need to expand the length of
-                                            // ResponseCode, so that we can 
indicate a 0 followed by some other bytes
-    
-    // handshaking properties
-    PROPERTIES_OK(1, "Properties OK", false),
-    UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
-    ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
-    MISSING_PROPERTY(232, "Missing Property", true),
-    
-    // transaction indicators
-    CONTINUE_TRANSACTION(10, "Continue Transaction", false),
-    FINISH_TRANSACTION(11, "Finish Transaction", false),
-    CONFIRM_TRANSACTION(12, "Confirm Transaction", true),   // "Explanation" 
of this code is the checksum
-    TRANSACTION_FINISHED(13, "Transaction Finished", false),
-    TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But 
Destination is Full", false),
-    BAD_CHECKSUM(19, "Bad Checksum", false),
-
-    // data availability indicators
-    MORE_DATA(20, "More Data Exists", false),
-    NO_MORE_DATA(21, "No More Data Exists", false),
-    
-    // port state indicators
-    UNKNOWN_PORT(200, "Unknown Port", false),
-    PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
-    PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
-    
-    // authorization
-    UNAUTHORIZED(240, "User Not Authorized", true),
-    
-    // error indicators
-    ABORT(250, "Abort", true),
-    UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
-    END_OF_STREAM(255, "End of Stream", false);
-    
-    private static final ResponseCode[] codeArray = new ResponseCode[256];
-    
-    static {
-        for ( final ResponseCode responseCode : ResponseCode.values() ) {
-            codeArray[responseCode.getCode()] = responseCode;
-        }
-    }
-    
-    private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
-    private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
-    private final int code;
-    private final byte[] codeSequence;
-    private final String description;
-    private final boolean containsMessage;
-    
-    private ResponseCode(final int code, final String description, final 
boolean containsMessage) {
-        this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, (byte) code};
-        this.code = code;
-        this.description = description;
-        this.containsMessage = containsMessage;
-    }
-    
-    public int getCode() {
-        return code;
-    }
-    
-    public byte[] getCodeSequence() {
-        return codeSequence;
-    }
-    
-    @Override
-    public String toString() {
-        return description;
-    }
-    
-    public boolean containsMessage() {
-        return containsMessage;
-    }
-    
-    public void writeResponse(final DataOutputStream out) throws IOException {
-        if ( containsMessage() ) {
-            throw new IllegalArgumentException("ResponseCode " + code + " 
expects an explanation");
-        }
-        
-        out.write(getCodeSequence());
-        out.flush();
-    }
-    
-    public void writeResponse(final DataOutputStream out, final String 
explanation) throws IOException {
-        if ( !containsMessage() ) {
-            throw new IllegalArgumentException("ResponseCode " + code + " does 
not expect an explanation");
-        }
-        
-        out.write(getCodeSequence());
-        out.writeUTF(explanation);
-        out.flush();
-    }
-    
-    static ResponseCode readCode(final InputStream in) throws IOException, 
ProtocolException {
-        final int byte1 = in.read();
-        if ( byte1 < 0 ) {
-            throw new EOFException();
-        } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
-            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
-        }
-        
-        final int byte2 = in.read();
-        if ( byte2 < 0 ) {
-            throw new EOFException();
-        } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
-            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
-        }
-
-        final int byte3 = in.read();
-        if ( byte3 < 0 ) {
-            throw new EOFException();
-        }
-        
-        final ResponseCode responseCode = codeArray[byte3];
-        if (responseCode == null) {
-            throw new ProtocolException("Received Response Code of " + byte3 + 
" but do not recognize this code");
-        }
-        return responseCode;
-    }
-    
-    public static ResponseCode fromSequence(final byte[] value) {
-        final int code = value[3] & 0xFF;
-        final ResponseCode responseCode = codeArray[code];
-        return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : 
responseCode;
-    }
-}
\ No newline at end of file

Reply via email to