http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
deleted file mode 100644
index 8c23e28..0000000
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ /dev/null
@@ -1,835 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.client.socket;
-
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
-
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.RemoteDestination;
-import org.apache.nifi.remote.RemoteResourceInitiator;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.cluster.ClusterNodeInformation;
-import org.apache.nifi.remote.cluster.NodeInformation;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
-import org.apache.nifi.remote.util.PeerStatusCache;
-import org.apache.nifi.remote.util.RemoteNiFiUtils;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.web.api.dto.ControllerDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EndpointConnectionStatePool {
-    public static final long PEER_REFRESH_PERIOD = 60000L;
-    public static final String CATEGORY = "Site-to-Site";
-    public static final long REMOTE_REFRESH_MILLIS = 
TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
-
-    private static final long PEER_CACHE_MILLIS = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
-
-       private static final Logger logger = 
LoggerFactory.getLogger(EndpointConnectionStatePool.class);
-       
-       private final BlockingQueue<EndpointConnectionState> 
connectionStateQueue = new LinkedBlockingQueue<>();
-    private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new 
ConcurrentHashMap<>();
-    private final URI clusterUrl;
-    private final String apiUri;
-    
-    private final AtomicLong peerIndex = new AtomicLong(0L);
-    
-    private final ReentrantLock peerRefreshLock = new ReentrantLock();
-    private volatile List<PeerStatus> peerStatuses;
-    private volatile long peerRefreshTime = 0L;
-    private volatile PeerStatusCache peerStatusCache;
-    private final Set<CommunicationsSession> activeCommsChannels = new 
HashSet<>();
-
-    private final File peersFile;
-    private final EventReporter eventReporter;
-    private final SSLContext sslContext;
-    private final ScheduledExecutorService taskExecutor;
-    
-    private final ReadWriteLock listeningPortRWLock = new 
ReentrantReadWriteLock();
-    private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
-    private final Lock remoteInfoWriteLock = listeningPortRWLock.writeLock();
-    private Integer siteToSitePort;
-    private Boolean siteToSiteSecure;
-    private long remoteRefreshTime;
-    private final Map<String, String> inputPortMap = new HashMap<>();  // map 
input port name to identifier
-    private final Map<String, String> outputPortMap = new HashMap<>(); // map 
output port name to identifier
-    
-    private volatile int commsTimeout;
-
-    public EndpointConnectionStatePool(final String clusterUrl, final int 
commsTimeoutMillis, final EventReporter eventReporter, final File 
persistenceFile) {
-       this(clusterUrl, commsTimeoutMillis, null, eventReporter, 
persistenceFile);
-    }
-    
-    public EndpointConnectionStatePool(final String clusterUrl, final int 
commsTimeoutMillis, final SSLContext sslContext, final EventReporter 
eventReporter, final File persistenceFile) {
-       try {
-               this.clusterUrl = new URI(clusterUrl);
-       } catch (final URISyntaxException e) {
-               throw new IllegalArgumentException("Invalid Cluster URL: " + 
clusterUrl);
-       }
-       
-       // Trim the trailing /
-        String uriPath = this.clusterUrl.getPath();
-        if (uriPath.endsWith("/")) {
-            uriPath = uriPath.substring(0, uriPath.length() - 1);
-        }
-        apiUri = this.clusterUrl.getScheme() + "://" + 
this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
-        
-       this.sslContext = sslContext;
-       this.peersFile = persistenceFile;
-       this.eventReporter = eventReporter;
-       this.commsTimeout = commsTimeoutMillis;
-       
-       Set<PeerStatus> recoveredStatuses;
-       if ( persistenceFile != null && persistenceFile.exists() ) {
-               try {
-                       recoveredStatuses = 
recoverPersistedPeerStatuses(peersFile);    
-                       this.peerStatusCache = new 
PeerStatusCache(recoveredStatuses, peersFile.lastModified());
-               } catch (final IOException ioe) {
-                       logger.warn("Failed to recover peer statuses from {} 
due to {}; will continue without loading information from file", 
persistenceFile, ioe);
-               }
-       } else {
-               peerStatusCache = null;
-       }
-
-       // Initialize a scheduled executor and run some maintenance tasks in 
the background to kill off old, unused
-       // connections and keep our list of peers up-to-date.
-       taskExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-               private final ThreadFactory defaultFactory = 
Executors.defaultThreadFactory();
-               
-                       @Override
-                       public Thread newThread(final Runnable r) {
-                               final Thread thread = 
defaultFactory.newThread(r);
-                               thread.setName("NiFi Site-to-Site Connection 
Pool Maintenance");
-                               return thread;
-                       }
-       });
-
-       taskExecutor.scheduleWithFixedDelay(new Runnable() {
-                       @Override
-                       public void run() {
-                               refreshPeers();
-                       }
-       }, 0, 5, TimeUnit.SECONDS);
-
-       taskExecutor.scheduleWithFixedDelay(new Runnable() {
-                       @Override
-                       public void run() {
-                               cleanupExpiredSockets();
-                       }
-       }, 5, 5, TimeUnit.SECONDS);
-    }
-    
-    
-    public EndpointConnectionState getEndpointConnectionState(final 
RemoteDestination remoteDestination, final TransferDirection direction) throws 
IOException, HandshakeException, PortNotRunningException, UnknownPortException, 
ProtocolException {
-        return getEndpointConnectionState(remoteDestination, direction, null);
-    }
-    
-    
-    
-    public EndpointConnectionState getEndpointConnectionState(final 
RemoteDestination remoteDestination, final TransferDirection direction, final 
SiteToSiteClientConfig config) throws IOException, HandshakeException, 
PortNotRunningException, UnknownPortException, ProtocolException {
-       //
-        // Attempt to get a connection state that already exists for this URL.
-        //
-        FlowFileCodec codec = null;
-        CommunicationsSession commsSession = null;
-        SocketClientProtocol protocol = null;
-        EndpointConnectionState connectionState;
-        Peer peer = null;
-        
-        final List<EndpointConnectionState> addBack = new ArrayList<>();
-        try {
-            do {
-                final PeerStatus peerStatus = getNextPeerStatus(direction);
-                if ( peerStatus == null ) {
-                       return null;
-                }
-    
-                connectionState = connectionStateQueue.poll();
-                logger.debug("{} Connection State for {} = {}", this, 
clusterUrl, connectionState);
-                
-                if ( connectionState == null && !addBack.isEmpty() ) {
-                    // all available connections have been penalized.
-                    return null;
-                }
-                
-                if ( connectionState != null && 
connectionState.getPeer().isPenalized() ) {
-                    // we have a connection, but it's penalized. We want to 
add it back to the queue
-                    // when we've found one to use.
-                    addBack.add(connectionState);
-                    continue;
-                }
-                
-                // if we can't get an existing ConnectionState, create one
-                if ( connectionState == null ) {
-                    protocol = new SocketClientProtocol();
-                    protocol.setDestination(remoteDestination);
-        
-                    try {
-                        commsSession = 
establishSiteToSiteConnection(peerStatus);
-                        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-                        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-                        try {
-                            
RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
-                        } catch (final HandshakeException e) {
-                            try {
-                                commsSession.close();
-                            } catch (final IOException ioe) {
-                               throw e;
-                            }
-                        }
-                    } catch (final IOException e) {
-                    }
-                    
-                    
-                    final String peerUrl = "nifi://" + 
peerStatus.getHostname() + ":" + peerStatus.getPort();
-                    peer = new Peer(commsSession, peerUrl, 
clusterUrl.toString());
-    
-                    // set properties based on config
-                    if ( config != null ) {
-                        protocol.setTimeout((int) 
config.getTimeout(TimeUnit.MILLISECONDS));
-                        
protocol.setPreferredBatchCount(config.getPreferredBatchCount());
-                        
protocol.setPreferredBatchSize(config.getPreferredBatchSize());
-                        
protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
-                    }
-                    
-                    // perform handshake
-                    try {
-                        protocol.handshake(peer);
-                        
-                        // handle error cases
-                        if ( protocol.isDestinationFull() ) {
-                            logger.warn("{} {} indicates that port's 
destination is full; penalizing peer", this, peer);
-                            penalize(peer, 
remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
-                            connectionStateQueue.offer(connectionState);
-                            continue;
-                        } else if ( protocol.isPortInvalid() ) {
-                               penalize(peer, 
remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
-                               cleanup(protocol, peer);
-                               throw new 
PortNotRunningException(peer.toString() + " indicates that port " + 
remoteDestination.getIdentifier() + " is not running");
-                        } else if ( protocol.isPortUnknown() ) {
-                               penalize(peer, 
remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
-                               cleanup(protocol, peer);
-                               throw new UnknownPortException(peer.toString() 
+ " indicates that port " + remoteDestination.getIdentifier() + " is not 
known");
-                        }
-                        
-                        // negotiate the FlowFileCodec to use
-                        codec = protocol.negotiateCodec(peer);
-                    } catch (final PortNotRunningException | 
UnknownPortException e) {
-                       throw e;
-                    } catch (final Exception e) {
-                        penalize(peer, 
remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
-                        cleanup(protocol, peer);
-                        
-                        final String message = String.format("%s failed to 
communicate with %s due to %s", this, peer == null ? clusterUrl : peer, 
e.toString());
-                        logger.error(message);
-                        if ( logger.isDebugEnabled() ) {
-                            logger.error("", e);
-                        }
-                        throw e;
-                    }
-                    
-                    connectionState = new EndpointConnectionState(peer, 
protocol, codec);
-                } else {
-                    final long lastTimeUsed = 
connectionState.getLastTimeUsed();
-                    final long millisSinceLastUse = System.currentTimeMillis() 
- lastTimeUsed;
-                    
-                    if ( commsTimeout > 0L && millisSinceLastUse >= 
commsTimeout ) {
-                        cleanup(connectionState.getSocketClientProtocol(), 
connectionState.getPeer());
-                        connectionState = null;
-                    } else {
-                        codec = connectionState.getCodec();
-                        peer = connectionState.getPeer();
-                        commsSession = peer.getCommunicationsSession();
-                        protocol = connectionState.getSocketClientProtocol();
-                    }
-                }
-            } while ( connectionState == null || codec == null || commsSession 
== null || protocol == null );
-        } finally {
-            if ( !addBack.isEmpty() ) {
-                connectionStateQueue.addAll(addBack);
-            }
-        }
-        
-        return connectionState;
-    }
-    
-    
-    public boolean offer(final EndpointConnectionState 
endpointConnectionState) {
-       final Peer peer = endpointConnectionState.getPeer();
-       if ( peer == null ) {
-               return false;
-       }
-       
-       final String url = peer.getUrl();
-       if ( url == null ) {
-               return false;
-       }
-       
-       return connectionStateQueue.offer(endpointConnectionState);
-    }
-    
-    /**
-     * Updates internal state map to penalize a PeerStatus that points to the 
specified peer
-     * @param peer
-     */
-    public void penalize(final Peer peer, final long penalizationMillis) {
-        String host;
-        int port;
-        try {
-            final URI uri = new URI(peer.getUrl());
-            host = uri.getHost();
-            port = uri.getPort();
-        } catch (final URISyntaxException e) {
-            host = peer.getHost();
-            port = -1;
-        }
-        
-        final PeerStatus status = new PeerStatus(host, port, true, 1);
-        Long expiration = peerTimeoutExpirations.get(status);
-        if ( expiration == null ) {
-            expiration = Long.valueOf(0L);
-        }
-        
-        final long newExpiration = Math.max(expiration, 
System.currentTimeMillis() + penalizationMillis);
-        peerTimeoutExpirations.put(status, Long.valueOf(newExpiration));
-    }
-    
-    private void cleanup(final SocketClientProtocol protocol, final Peer peer) 
{
-        if ( protocol != null && peer != null ) {
-            try {
-                protocol.shutdown(peer);
-            } catch (final TransmissionDisabledException e) {
-                // User disabled transmission.... do nothing.
-                logger.debug(this + " Transmission Disabled by User");
-            } catch (IOException e1) {
-            }
-        }
-        
-        if ( peer != null ) {
-            try {
-                peer.close();
-            } catch (final TransmissionDisabledException e) {
-                // User disabled transmission.... do nothing.
-                logger.debug(this + " Transmission Disabled by User");
-            } catch (IOException e1) {
-            }
-        }
-    }
-    
-    private PeerStatus getNextPeerStatus(final TransferDirection direction) {
-        List<PeerStatus> peerList = peerStatuses;
-        if ( (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && 
peerRefreshLock.tryLock() ) {
-            try {
-                try {
-                    peerList = createPeerStatusList(direction);
-                } catch (final Exception e) {
-                    final String message = String.format("%s Failed to update 
list of peers due to %s", this, e.toString());
-                    logger.warn(message);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", e);
-                    }
-                    
-                    if ( eventReporter != null ) {
-                       eventReporter.reportEvent(Severity.WARNING, CATEGORY, 
message);
-                    }
-                }
-                
-                this.peerStatuses = peerList;
-                peerRefreshTime = System.currentTimeMillis();
-            } finally {
-                peerRefreshLock.unlock();
-            }
-        }
-
-        if ( peerList == null || peerList.isEmpty() ) {
-            return null;
-        }
-
-        PeerStatus peerStatus;
-        for (int i=0; i < peerList.size(); i++) {
-            final long idx = peerIndex.getAndIncrement();
-            final int listIndex = (int) (idx % peerList.size());
-            peerStatus = peerList.get(listIndex);
-            
-            if ( isPenalized(peerStatus) ) {
-                logger.debug("{} {} is penalized; will not communicate with 
this peer", this, peerStatus);
-            } else {
-                return peerStatus;
-            }
-        }
-        
-        logger.debug("{} All peers appear to be penalized; returning null", 
this);
-        return null;
-    }
-    
-    private boolean isPenalized(final PeerStatus peerStatus) {
-        final Long expirationEnd = peerTimeoutExpirations.get(peerStatus);
-        return (expirationEnd == null ? false : expirationEnd > 
System.currentTimeMillis() );
-    }
-    
-    private List<PeerStatus> createPeerStatusList(final TransferDirection 
direction) throws IOException, HandshakeException, UnknownPortException, 
PortNotRunningException {
-        final Set<PeerStatus> statuses = getPeerStatuses();
-        if ( statuses == null ) {
-            return new ArrayList<>();
-        }
-        
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> nodeInfos = new ArrayList<>();
-        for ( final PeerStatus peerStatus : statuses ) {
-            final NodeInformation nodeInfo = new 
NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, 
peerStatus.isSecure(), peerStatus.getFlowFileCount());
-            nodeInfos.add(nodeInfo);
-        }
-        clusterNodeInfo.setNodeInformation(nodeInfos);
-        return formulateDestinationList(clusterNodeInfo, direction);
-    }
-    
-    
-    private Set<PeerStatus> getPeerStatuses() {
-        final PeerStatusCache cache = this.peerStatusCache;
-        if (cache == null || cache.getStatuses() == null || 
cache.getStatuses().isEmpty()) {
-            return null;
-        }
-
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < 
System.currentTimeMillis()) {
-            final Set<PeerStatus> equalizedSet = new 
HashSet<>(cache.getStatuses().size());
-            for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new 
PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
-                equalizedSet.add(equalizedStatus);
-            }
-
-            return equalizedSet;
-        }
-
-        return cache.getStatuses();
-    }
-
-    private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, 
HandshakeException, UnknownPortException, PortNotRunningException {
-       final String hostname = clusterUrl.getHost();
-        final int port = getSiteToSitePort();
-       
-       final CommunicationsSession commsSession = 
establishSiteToSiteConnection(hostname, port);
-        final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + 
port, clusterUrl.toString());
-        final SocketClientProtocol clientProtocol = new SocketClientProtocol();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, 
dis, dos);
-
-        clientProtocol.setTimeout(commsTimeout);
-        clientProtocol.handshake(peer, null);
-        final Set<PeerStatus> peerStatuses = 
clientProtocol.getPeerStatuses(peer);
-        persistPeerStatuses(peerStatuses);
-
-        try {
-            clientProtocol.shutdown(peer);
-        } catch (final IOException e) {
-            final String message = String.format("%s Failed to shutdown 
protocol when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-        }
-
-        try {
-            peer.close();
-        } catch (final IOException e) {
-            final String message = String.format("%s Failed to close resources 
when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-        }
-
-        return peerStatuses;
-    }
-
-
-    private void persistPeerStatuses(final Set<PeerStatus> statuses) {
-       if ( peersFile == null ) {
-               return;
-       }
-       
-        try (final OutputStream fos = new FileOutputStream(peersFile);
-                final OutputStream out = new BufferedOutputStream(fos)) {
-
-            for (final PeerStatus status : statuses) {
-                final String line = status.getHostname() + ":" + 
status.getPort() + ":" + status.isSecure() + "\n";
-                out.write(line.getBytes(StandardCharsets.UTF_8));
-            }
-
-        } catch (final IOException e) {
-            logger.error("Failed to persist list of Peers due to {}; if 
restarted and peer's NCM is down, may be unable to transfer data until 
communications with NCM are restored", e.toString(), e);
-        }
-    }
-
-    private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) 
throws IOException {
-        if (!file.exists()) {
-            return null;
-        }
-
-        final Set<PeerStatus> statuses = new HashSet<>();
-        try (final InputStream fis = new FileInputStream(file);
-                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis))) {
-
-            String line;
-            while ((line = reader.readLine()) != null) {
-                final String[] splits = line.split(Pattern.quote(":"));
-                if (splits.length != 3) {
-                    continue;
-                }
-
-                final String hostname = splits[0];
-                final int port = Integer.parseInt(splits[1]);
-                final boolean secure = Boolean.parseBoolean(splits[2]);
-
-                statuses.add(new PeerStatus(hostname, port, secure, 1));
-            }
-        }
-
-        return statuses;
-    }
-    
-    
-    private CommunicationsSession establishSiteToSiteConnection(final 
PeerStatus peerStatus) throws IOException {
-       return establishSiteToSiteConnection(peerStatus.getHostname(), 
peerStatus.getPort());
-    }
-    
-    private CommunicationsSession establishSiteToSiteConnection(final String 
hostname, final int port) throws IOException {
-       if ( siteToSiteSecure == null ) {
-               throw new IOException("Remote NiFi instance " + clusterUrl + " 
is not currently configured to accept site-to-site connections");
-       }
-       
-        final String destinationUri = "nifi://" + hostname + ":" + port;
-
-        CommunicationsSession commsSession = null;
-        try {
-               if ( siteToSiteSecure ) {
-                   if ( sslContext == null ) {
-                       throw new IOException("Unable to communicate with " + 
hostname + ":" + port + " because it requires Secure Site-to-Site 
communications, but this instance is not configured for secure communications");
-                   }
-                   
-                   final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, hostname, port, true);
-                   socketChannel.connect();
-           
-                   commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-                       
-                       try {
-                           commsSession.setUserDn(socketChannel.getDn());
-                       } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
-                           throw new IOException(ex);
-                       }
-               } else {
-                   final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(hostname, port));
-                   commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
-               }
-       
-               
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-               commsSession.setUri(destinationUri);
-        } catch (final IOException ioe) {
-            if ( commsSession != null ) {
-                commsSession.close();
-            }
-            
-            throw ioe;
-        }
-        
-        return commsSession;
-    }
-    
-    
-    static List<PeerStatus> formulateDestinationList(final 
ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) {
-        final Collection<NodeInformation> nodeInfoSet = 
clusterNodeInfo.getNodeInformation();
-        final int numDestinations = Math.max(128, nodeInfoSet.size());
-        final Map<NodeInformation, Integer> entryCountMap = new HashMap<>();
-
-        long totalFlowFileCount = 0L;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            totalFlowFileCount += nodeInfo.getTotalFlowFiles();
-        }
-
-        int totalEntries = 0;
-        for (final NodeInformation nodeInfo : nodeInfoSet) {
-            final int flowFileCount = nodeInfo.getTotalFlowFiles();
-            // don't allow any node to get more than 80% of the data
-            final double percentageOfFlowFiles = Math.min(0.8D, ((double) 
flowFileCount / (double) totalFlowFileCount));
-            final double relativeWeighting = (direction == 
TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : 
percentageOfFlowFiles;
-            final int entries = Math.max(1, (int) (numDestinations * 
relativeWeighting));
-            
-            entryCountMap.put(nodeInfo, Math.max(1, entries));
-            totalEntries += entries;
-        }
-        
-        final List<PeerStatus> destinations = new ArrayList<>(totalEntries);
-        for (int i=0; i < totalEntries; i++) {
-            destinations.add(null);
-        }
-        for ( final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet() ) {
-            final NodeInformation nodeInfo = entry.getKey();
-            final int numEntries = entry.getValue();
-            
-            int skipIndex = numEntries;
-            for (int i=0; i < numEntries; i++) {
-                int n = (skipIndex * i);
-                while (true) {
-                    final int index = n % destinations.size();
-                    PeerStatus status = destinations.get(index);
-                    if ( status == null ) {
-                        status = new PeerStatus(nodeInfo.getHostname(), 
nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), 
nodeInfo.getTotalFlowFiles());
-                        destinations.set(index, status);
-                        break;
-                    } else {
-                        n++;
-                    }
-                }
-            }
-        }
-
-        final StringBuilder distributionDescription = new StringBuilder();
-        distributionDescription.append("New Weighted Distribution of Nodes:");
-        for ( final Map.Entry<NodeInformation, Integer> entry : 
entryCountMap.entrySet() ) {
-            final double percentage = entry.getValue() * 100D / (double) 
destinations.size();
-            
distributionDescription.append("\n").append(entry.getKey()).append(" will 
receive ").append(percentage).append("% of FlowFiles");
-        }
-        logger.info(distributionDescription.toString());
-
-        // Jumble the list of destinations.
-        return destinations;
-    }
-    
-    
-    private void cleanupExpiredSockets() {
-        final List<EndpointConnectionState> states = new ArrayList<>();
-        
-        EndpointConnectionState state;
-        while ((state = connectionStateQueue.poll()) != null) {
-            // If the socket has not been used in 10 seconds, shut it down.
-            final long lastUsed = state.getLastTimeUsed();
-            if ( lastUsed < System.currentTimeMillis() - 10000L ) {
-                try {
-                    state.getSocketClientProtocol().shutdown(state.getPeer());
-                } catch (final Exception e) {
-                    logger.debug("Failed to shut down {} using {} due to {}", 
-                        new Object[] {state.getSocketClientProtocol(), 
state.getPeer(), e} );
-                }
-                
-                cleanup(state.getSocketClientProtocol(), state.getPeer());
-            } else {
-                states.add(state);
-            }
-        }
-        
-        connectionStateQueue.addAll(states);
-    }
-    
-    public void shutdown() {
-       taskExecutor.shutdown();
-       peerTimeoutExpirations.clear();
-            
-        for ( final CommunicationsSession commsSession : activeCommsChannels ) 
{
-            commsSession.interrupt();
-        }
-        
-        EndpointConnectionState state;
-        while ( (state = connectionStateQueue.poll()) != null)  {
-            cleanup(state.getSocketClientProtocol(), state.getPeer());
-        }
-    }
-    
-    public void terminate(final EndpointConnectionState state) {
-        cleanup(state.getSocketClientProtocol(), state.getPeer());
-    }
-    
-    private void refreshPeers() {
-        final PeerStatusCache existingCache = peerStatusCache;
-        if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
-            return;
-        }
-
-        try {
-            final Set<PeerStatus> statuses = fetchRemotePeerStatuses();
-            peerStatusCache = new PeerStatusCache(statuses);
-            logger.info("{} Successfully refreshed Peer Status; remote 
instance consists of {} peers", this, statuses.size());
-        } catch (Exception e) {
-            logger.warn("{} Unable to refresh Remote Group's peers due to {}", 
this, e);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-        }
-    }
-    
-    
-    public String getInputPortIdentifier(final String portName) throws 
IOException {
-        return getPortIdentifier(portName, inputPortMap);
-    }
-    
-    public String getOutputPortIdentifier(final String portName) throws 
IOException {
-       return getPortIdentifier(portName, outputPortMap);
-    }
-    
-    
-    private String getPortIdentifier(final String portName, final Map<String, 
String> portMap) throws IOException {
-       String identifier;
-       remoteInfoReadLock.lock();
-        try {
-               identifier = portMap.get(portName);
-        } finally {
-               remoteInfoReadLock.unlock();
-        }
-        
-        if ( identifier != null ) {
-               return identifier;
-        }
-        
-        refreshRemoteInfo();
-
-       remoteInfoReadLock.lock();
-        try {
-               return portMap.get(portName);
-        } finally {
-               remoteInfoReadLock.unlock();
-        }
-    }
-    
-    
-    private ControllerDTO refreshRemoteInfo() throws IOException {
-       final boolean webInterfaceSecure = 
clusterUrl.toString().startsWith("https");
-        final RemoteNiFiUtils utils = new RemoteNiFiUtils(webInterfaceSecure ? 
sslContext : null);
-               final ControllerDTO controller = 
utils.getController(URI.create(apiUri + "/controller"), commsTimeout);
-        
-        remoteInfoWriteLock.lock();
-        try {
-            this.siteToSitePort = controller.getRemoteSiteListeningPort();
-            this.siteToSiteSecure = controller.isSiteToSiteSecure();
-            
-            inputPortMap.clear();
-            for (final PortDTO inputPort : controller.getInputPorts()) {
-               inputPortMap.put(inputPort.getName(), inputPort.getId());
-            }
-            
-            outputPortMap.clear();
-            for ( final PortDTO outputPort : controller.getOutputPorts()) {
-               outputPortMap.put(outputPort.getName(), outputPort.getId());
-            }
-            
-            this.remoteRefreshTime = System.currentTimeMillis();
-        } finally {
-               remoteInfoWriteLock.unlock();
-        }
-        
-        return controller;
-    }
-    
-    /**
-     * @return the port that the remote instance is listening on for
-     * site-to-site communication, or <code>null</code> if the remote instance
-     * is not configured to allow site-to-site communications.
-     *
-     * @throws IOException if unable to communicate with the remote instance
-     */
-    private Integer getSiteToSitePort() throws IOException {
-        Integer listeningPort;
-        remoteInfoReadLock.lock();
-        try {
-            listeningPort = this.siteToSitePort;
-            if (listeningPort != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
-                return listeningPort;
-            }
-        } finally {
-               remoteInfoReadLock.unlock();
-        }
-
-        final ControllerDTO controller = refreshRemoteInfo();
-        listeningPort = controller.getRemoteSiteListeningPort();
-
-        return listeningPort;
-    }
- 
-    /**
-     * Returns {@code true} if the remote instance is configured for secure 
site-to-site communications,
-     * {@code false} otherwise.
-     * 
-     * @return
-     * @throws IOException
-     */
-    public boolean isSecure() throws IOException {
-        remoteInfoReadLock.lock();
-        try {
-            final Boolean secure = this.siteToSiteSecure;
-            if (secure != null && this.remoteRefreshTime > 
System.currentTimeMillis() - REMOTE_REFRESH_MILLIS) {
-                return secure;
-            }
-        } finally {
-               remoteInfoReadLock.unlock();
-        }
-
-        final ControllerDTO controller = refreshRemoteInfo();
-        return controller.isSiteToSiteSecure();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 0494d04..6fa934b 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -24,23 +24,23 @@ import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.UnknownPortException;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.util.ObjectHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SocketClient implements SiteToSiteClient {
+    private static final Logger logger = 
LoggerFactory.getLogger(SocketClient.class);
+    
     private final SiteToSiteClientConfig config;
-       private final EndpointConnectionStatePool pool;
+       private final EndpointConnectionPool pool;
        private final boolean compress;
        private final String portName;
        private final long penalizationNanos;
        private volatile String portIdentifier;
        
        public SocketClient(final SiteToSiteClientConfig config) {
-               pool = new EndpointConnectionStatePool(config.getUrl(), (int) 
config.getTimeout(TimeUnit.MILLISECONDS), 
+               pool = new EndpointConnectionPool(config.getUrl(), (int) 
config.getTimeout(TimeUnit.MILLISECONDS), 
                                config.getSslContext(), 
config.getEventReporter(), config.getPeerPersistenceFile());
                
                this.config = config;
@@ -66,44 +66,55 @@ public class SocketClient implements SiteToSiteClient {
                        return id;
                }
                
+               final String portId;
                if ( direction == TransferDirection.SEND ) {
-                       return pool.getInputPortIdentifier(this.portName);
+                       portId = pool.getInputPortIdentifier(this.portName);
                } else {
-                       return pool.getOutputPortIdentifier(this.portName);
+                       portId = pool.getOutputPortIdentifier(this.portName);
                }
+               
+               if (portId == null) {
+                   logger.debug("Unable to resolve port [{}] to an 
identifier", portName);
+               } else {
+                   logger.debug("Resolved port [{}] to identifier [{}]", 
portName, portId);
+               }
+               
+               return portId;
        }
        
        
+       private RemoteDestination createRemoteDestination(final String portId) {
+           return new RemoteDestination() {
+            @Override
+            public String getIdentifier() {
+                return portId;
+            }
+
+            @Override
+            public long getYieldPeriod(final TimeUnit timeUnit) {
+                return timeUnit.convert(penalizationNanos, 
TimeUnit.NANOSECONDS);
+            }
+
+            @Override
+            public boolean isUseCompression() {
+                return compress;
+            }
+        };
+       }
+       
        @Override
        public Transaction createTransaction(final TransferDirection direction) 
throws IOException {
-               final String portId = getPortIdentifier(TransferDirection.SEND);
+               final String portId = getPortIdentifier(direction);
                
                if ( portId == null ) {
-                       throw new IOException("Could not find Port with name " 
+ portName + " for remote NiFi instance");
+                       throw new IOException("Could not find Port with name '" 
+ portName + "' for remote NiFi instance");
                }
                
-               final RemoteDestination remoteDestination = new 
RemoteDestination() {
-                       @Override
-                       public String getIdentifier() {
-                               return portId;
-                       }
-
-                       @Override
-                       public long getYieldPeriod(final TimeUnit timeUnit) {
-                               return timeUnit.convert(penalizationNanos, 
TimeUnit.NANOSECONDS);
-                       }
-
-                       @Override
-                       public boolean isUseCompression() {
-                               return compress;
-                       }
-               };
+               final RemoteDestination remoteDestination = 
createRemoteDestination(portId);
                
-               final EndpointConnectionState connectionState;
-               try {
-                       connectionState = 
pool.getEndpointConnectionState(remoteDestination, direction);
-               } catch (final ProtocolException | HandshakeException | 
PortNotRunningException | UnknownPortException e) {
-                       throw new IOException(e);
+               final EndpointConnection connectionState = 
pool.getEndpointConnection(remoteDestination, direction, getConfig());
+               if ( connectionState == null ) {
+                   return null;
                }
                
                final Transaction transaction = 
connectionState.getSocketClientProtocol().startTransaction(
@@ -111,7 +122,7 @@ public class SocketClient implements SiteToSiteClient {
                
                // Wrap the transaction in a new one that will return the 
EndpointConnectionState back to the pool whenever
                // the transaction is either completed or canceled.
-               final ObjectHolder<EndpointConnectionState> connectionStateRef 
= new ObjectHolder<>(connectionState);
+               final ObjectHolder<EndpointConnection> connectionStateRef = new 
ObjectHolder<>(connectionState);
                return new Transaction() {
                        @Override
                        public void confirm() throws IOException {
@@ -119,11 +130,16 @@ public class SocketClient implements SiteToSiteClient {
                        }
 
                        @Override
+                       public void complete() throws IOException {
+                           complete(false);
+                       }
+                       
+                       @Override
                        public void complete(final boolean requestBackoff) 
throws IOException {
                                try {
                                        transaction.complete(requestBackoff);
                                } finally {
-                                   final EndpointConnectionState state = 
connectionStateRef.get();
+                                   final EndpointConnection state = 
connectionStateRef.get();
                                    if ( state != null ) {
                                        pool.offer(connectionState);
                                        connectionStateRef.set(null);
@@ -136,7 +152,7 @@ public class SocketClient implements SiteToSiteClient {
                                try {
                                        transaction.cancel(explanation);
                                } finally {
-                    final EndpointConnectionState state = 
connectionStateRef.get();
+                    final EndpointConnection state = connectionStateRef.get();
                     if ( state != null ) {
                         pool.terminate(connectionState);
                         connectionStateRef.set(null);
@@ -149,7 +165,7 @@ public class SocketClient implements SiteToSiteClient {
                            try {
                                transaction.error();
                            } finally {
-                    final EndpointConnectionState state = 
connectionStateRef.get();
+                    final EndpointConnection state = connectionStateRef.get();
                     if ( state != null ) {
                         pool.terminate(connectionState);
                         connectionStateRef.set(null);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
index b61fc65..d4d55e1 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -16,8 +16,15 @@
  */
 package org.apache.nifi.remote.exception;
 
-public class HandshakeException extends Exception {
+import java.io.IOException;
 
+
+/**
+ * A HandshakeException occurs when the client and the remote NiFi instance do 
not agree
+ * on some condition during the handshake. For example, if the NiFi instance 
does not recognize
+ * one of the parameters that the client passes during the Handshaking phase.
+ */
+public class HandshakeException extends IOException {
     private static final long serialVersionUID = 178192341908726L;
 
     public HandshakeException(final String message) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
index af0f467..8b97832 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -16,8 +16,12 @@
  */
 package org.apache.nifi.remote.exception;
 
-public class PortNotRunningException extends Exception {
-
+/**
+ * PortNotRunningException occurs when the remote NiFi instance reports
+ * that the Port that the client is attempting to communicate with is not
+ * currently running and therefore communications with that Port are not 
allowed.
+ */
+public class PortNotRunningException extends ProtocolException {
     private static final long serialVersionUID = -2790940982005516375L;
 
     public PortNotRunningException(final String message) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index e12348a..45a4e15 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -18,6 +18,10 @@ package org.apache.nifi.remote.exception;
 
 import java.io.IOException;
 
+/**
+ * A ProtocolException occurs when unexpected data is received, for example
+ * an invalid Response Code.
+ */
 public class ProtocolException extends IOException {
 
     private static final long serialVersionUID = 5763900324505818495L;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
index e6a0fe7..592a1b3 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -16,8 +16,11 @@
  */
 package org.apache.nifi.remote.exception;
 
-public class UnknownPortException extends Exception {
-
+/**
+ * An UnknownPortException indicates that the remote NiFi instance has 
reported that
+ * the endpoint that the client attempted to communicate with does not exist.
+ */
+public class UnknownPortException extends ProtocolException {
     private static final long serialVersionUID = -2790940982005516375L;
 
     public UnknownPortException(final String message) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
index 9e451fd..7dffddd 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -63,4 +63,9 @@ public class SocketChannelInput implements 
CommunicationsInput {
     public void interrupt() {
         interruptableIn.interrupt();
     }
+
+    @Override
+    public void consume() throws IOException {
+        socketIn.consume();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
index 60ef33f..01fb9f2 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
@@ -47,4 +47,9 @@ public class SSLSocketChannelInput implements 
CommunicationsInput {
     public long getBytesRead() {
         return countingIn.getBytesRead();
     }
+
+    @Override
+    public void consume() throws IOException {
+        in.consume();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index befbdaa..36a0e8d 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -40,9 +40,9 @@ public interface ClientProtocol extends 
VersionedRemoteResource {
 
     FlowFileCodec negotiateCodec(Peer peer) throws IOException, 
ProtocolException;
 
-    void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
+    int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
 
-    void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
+    int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
 
     void shutdown(Peer peer) throws IOException, ProtocolException;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
index d2e2946..5e56902 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -21,6 +21,12 @@ import java.io.InputStream;
 
 public interface CommunicationsInput {
 
+    /**
+     * Reads all data currently on the socket and throws it away
+     * @throws IOException
+     */
+    void consume() throws IOException;
+    
     InputStream getInputStream() throws IOException;
 
     long getBytesRead();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 5f194f8..e321663 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -150,6 +150,7 @@ public class SocketClientProtocol implements ClientProtocol 
{
             }
         }
         
+        logger.debug("Handshaking with properties {}", properties);
         dos.writeInt(properties.size());
         for ( final Map.Entry<HandshakeProperty, String> entry : 
properties.entrySet() ) {
             dos.writeUTF(entry.getKey().name());
@@ -269,13 +270,13 @@ public class SocketClientProtocol implements 
ClientProtocol {
             throw new IllegalStateException("Cannot start transaction; 
handshake resolution was " + handshakeResponse);
         }
         
-        return new SocketClientTransaction(versionNegotiator.getVersion(), 
peer, codec, 
+        return new SocketClientTransaction(versionNegotiator.getVersion(), 
destination.getIdentifier(), peer, codec, 
                        direction, useCompression, (int) 
destination.getYieldPeriod(TimeUnit.MILLISECONDS));
     }
 
 
     @Override
-    public void receiveFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
+    public int receiveFlowFiles(final Peer peer, final ProcessContext context, 
final ProcessSession session, final FlowFileCodec codec) throws IOException, 
ProtocolException {
        final String userDn = peer.getCommunicationsSession().getUserDn();
        final Transaction transaction = startTransaction(peer, codec, 
TransferDirection.RECEIVE);
        
@@ -288,7 +289,7 @@ public class SocketClientProtocol implements ClientProtocol 
{
                final DataPacket dataPacket = transaction.receive();
                if ( dataPacket == null ) {
                    if ( flowFilesReceived.isEmpty() ) {
-                       
peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+                       peer.penalize(destination.getIdentifier(), 
destination.getYieldPeriod(TimeUnit.MILLISECONDS));
                    }
                        break;
                }
@@ -322,25 +323,25 @@ public class SocketClientProtocol implements 
ClientProtocol {
                transaction.complete(applyBackpressure);
                logger.debug("{} Sending 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
 
-               if ( flowFilesReceived.isEmpty() ) {
-                   return;
+               if ( !flowFilesReceived.isEmpty() ) {
+               stopWatch.stop();
+               final String flowFileDescription = flowFilesReceived.size() < 
20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+               final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
+               final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+               final String dataSize = 
FormatUtils.formatDataSize(bytesReceived);
+               logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] { 
+                               this, flowFileDescription, dataSize, peer, 
uploadMillis, uploadDataRate });
                }
                
-               stopWatch.stop();
-               final String flowFileDescription = flowFilesReceived.size() < 
20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-               final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
-               final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
-               final String dataSize = 
FormatUtils.formatDataSize(bytesReceived);
-               logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] { 
-                               this, flowFileDescription, dataSize, peer, 
uploadMillis, uploadDataRate });
+               return flowFilesReceived.size();
     }
 
     
     @Override
-    public void transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
+    public int transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
                FlowFile flowFile = session.get();
                if (flowFile == null) {
-                       return;
+                       return 0;
                }
 
                try {
@@ -401,6 +402,8 @@ public class SocketClientProtocol implements ClientProtocol 
{
                final String flowFileDescription = (flowFilesSent.size() < 20) 
? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
                logger.info("{} Successfully sent {} ({}) to {} in {} 
milliseconds at a rate of {}", new Object[] {
                    this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
+               
+               return flowFilesSent.size();
                } catch (final Exception e) {
                        session.rollback();
                        throw e;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index edb360e..cf8f9b2 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,6 +19,7 @@ package org.apache.nifi.remote.protocol.socket;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
@@ -29,6 +30,8 @@ import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.slf4j.Logger;
@@ -47,14 +50,16 @@ public class SocketClientTransaction implements Transaction 
{
        private final boolean compress;
        private final Peer peer;
        private final int penaltyMillis;
+       private final String destinationId;
        
        private boolean dataAvailable = false;
        private int transfers = 0;
        private TransactionState state;
        
-       SocketClientTransaction(final int protocolVersion, final Peer peer, 
final FlowFileCodec codec, 
+       SocketClientTransaction(final int protocolVersion, final String 
destinationId, final Peer peer, final FlowFileCodec codec, 
                        final TransferDirection direction, final boolean 
useCompression, final int penaltyMillis) throws IOException {
                this.protocolVersion = protocolVersion;
+               this.destinationId = destinationId;
                this.peer = peer;
                this.codec = codec;
                this.direction = direction;
@@ -140,7 +145,8 @@ public class SocketClientTransaction implements Transaction 
{
                }
                
             logger.debug("{} Receiving data from {}", this, peer);
-            final DataPacket packet = codec.decode(new CheckedInputStream(dis, 
crc));
+            final InputStream dataIn = compress ? new 
CompressionInputStream(dis) : dis;
+            final DataPacket packet = codec.decode(new 
CheckedInputStream(dataIn, crc));
             
             if ( packet == null ) {
                 this.dataAvailable = false;
@@ -174,7 +180,8 @@ public class SocketClientTransaction implements Transaction 
{
     
             logger.debug("{} Sending data to {}", this, peer);
     
-               final OutputStream out = new CheckedOutputStream(dos, crc);
+            final OutputStream dataOut = compress ? new 
CompressionOutputStream(dos) : dos;
+               final OutputStream out = new CheckedOutputStream(dataOut, crc);
             codec.encode(dataPacket, out);
             
             // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
@@ -208,6 +215,10 @@ public class SocketClientTransaction implements 
Transaction {
                }
        }
        
+       @Override
+       public void complete() throws IOException {
+           complete(false);
+       }
        
        @Override
        public void complete(boolean requestBackoff) throws IOException {
@@ -246,7 +257,7 @@ public class SocketClientTransaction implements Transaction 
{
                 
                 logger.debug("{} Received {} from {}", this, 
transactionResponse, peer);
                 if ( transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-                    peer.penalize(penaltyMillis);
+                    peer.penalize(destinationId, penaltyMillis);
                 } else if ( transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED ) {
                     throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
index d8899ea..275e40c 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -39,7 +39,7 @@ public class TestEndpointConnectionStatePool {
         collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
 
         clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
             System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
         }
@@ -53,7 +53,7 @@ public class TestEndpointConnectionStatePool {
         collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
50000));
 
         clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
             System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
         }
@@ -73,7 +73,7 @@ public class TestEndpointConnectionStatePool {
         collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
 
         clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
             System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
         }
@@ -87,7 +87,7 @@ public class TestEndpointConnectionStatePool {
         collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
50000));
 
         clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
         for ( final PeerStatus peerStatus : destinations ) {
             System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
new file mode 100644
index 0000000..a744905
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestSiteToSiteClient {
+
+    @Test
+    @Ignore("For local testing only; not really a unit test but a manual test")
+    public void testReceive() throws IOException {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+        
+        final SiteToSiteClient client = new SiteToSiteClient.Builder()
+            .url("http://localhost:8080/nifi";)
+            .portName("out")
+            .requestBatchCount(1)
+            .build();
+        
+        try {
+            final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+            Assert.assertNotNull(transaction);
+            
+            final DataPacket packet = transaction.receive();
+            Assert.assertNotNull(packet);
+            
+            final InputStream in = packet.getData();
+            final long size = packet.getSize();
+            final byte[] buff = new byte[(int) size];
+            
+            StreamUtils.fillBuffer(in, buff);
+            System.out.println(buff.length);
+            
+            Assert.assertNull(transaction.receive());
+            
+            transaction.confirm();
+            transaction.complete(false);
+        } finally {
+            client.close();
+        }
+    }
+    
+    
+    @Test
+    @Ignore("For local testing only; not really a unit test but a manual test")
+    public void testSend() throws IOException {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+        
+        final SiteToSiteClient client = new SiteToSiteClient.Builder()
+            .url("http://localhost:8080/nifi";)
+            .portName("in")
+            .build();
+        
+        try {
+            final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+            Assert.assertNotNull(transaction);
+            
+            final Map<String, String> attrs = new HashMap<>();
+            attrs.put("site-to-site", "yes, please!");
+            final byte[] bytes = "Hello".getBytes();
+            final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            final DataPacket packet = new StandardDataPacket(attrs, bais, 
bytes.length);
+            transaction.send(packet);
+
+            transaction.confirm();
+            transaction.complete(false);
+        } finally {
+            client.close();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
index 32a3f26..f68c874 100644
--- 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -43,6 +43,16 @@ public class SocketChannelInputStream extends InputStream {
     public void setTimeout(final int timeoutMillis) {
         this.timeoutMillis = timeoutMillis;
     }
+    
+    public void consume() throws IOException {
+        final byte[] b = new byte[4096];
+        final ByteBuffer buffer = ByteBuffer.wrap(b);
+        int bytesRead;
+        do {
+            bytesRead = channel.read(buffer);
+            buffer.flip();
+        } while ( bytesRead > 0 );
+    }
 
     @Override
     public int read() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
index 5810488..7c74b20 100644
--- 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -258,6 +258,16 @@ public class SSLSocketChannel implements Closeable {
         }
     }
 
+    public void consume() throws IOException {
+        final byte[] b = new byte[4096];
+        final ByteBuffer buffer = ByteBuffer.wrap(b);
+        int readCount;
+        do {
+            readCount = channel.read(buffer);
+            buffer.flip();
+        } while (readCount > 0);
+    }
+    
     private int readData(final ByteBuffer dest) throws IOException {
         final long startTime = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
index 154bd08..6fb79d4 100644
--- 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
+++ 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -27,6 +27,10 @@ public class SSLSocketChannelInputStream extends InputStream 
{
         this.channel = channel;
     }
 
+    public void consume() throws IOException {
+        channel.consume();
+    }
+    
     @Override
     public int read() throws IOException {
         return channel.read();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index ac41cba..c842195 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -24,9 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 
 public interface RemoteProcessGroup {
 
@@ -81,8 +79,6 @@ public interface RemoteProcessGroup {
 
     String getYieldDuration();
     
-    EndpointConnectionStatePool getConnectionPool();
-
     /**
      * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7cb2874..54f0807 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.net.ssl.SSLContext;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
@@ -128,14 +129,12 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.groups.StandardProcessGroup;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.logging.ProcessorLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.NarClassLoader;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.Processor;
@@ -165,6 +164,7 @@ import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
@@ -184,7 +184,6 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/081471c4/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index db0aeb7..79ef7a8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -56,7 +56,6 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 import org.apache.nifi.remote.util.RemoteNiFiUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
@@ -130,7 +129,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     private volatile String authorizationIssue;
 
-    private final EndpointConnectionStatePool endpointConnectionPool;
     private final ScheduledExecutorService backgroundThreadExecutor;
 
     public StandardRemoteProcessGroup(final String id, final String targetUri, 
final ProcessGroup processGroup,
@@ -172,13 +170,9 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             }
         };
 
-        endpointConnectionPool = new 
EndpointConnectionStatePool(getTargetUri().toString(), 
getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
-                       sslContext, eventReporter, getPeerPersistenceFile());
-        
         final Runnable checkAuthorizations = new InitializationTask();
-
         backgroundThreadExecutor = new FlowEngine(1, "Remote Process Group " + 
id + ": " + targetUri);
-        backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 
0L, 30L, TimeUnit.SECONDS);
+        backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 
5L, 30L, TimeUnit.SECONDS);
     }
 
     @Override
@@ -200,7 +194,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     @Override
     public void shutdown() {
         backgroundThreadExecutor.shutdown();
-        endpointConnectionPool.shutdown();
     }
     
     @Override
@@ -1222,11 +1215,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
     }
     
     @Override
-    public EndpointConnectionStatePool getConnectionPool() {
-        return endpointConnectionPool;
-    }
-
-    @Override
     public void verifyCanDelete() {
         verifyCanDelete(false);
     }

Reply via email to