http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git 
a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
new file mode 100644
index 0000000..6dab77b
--- /dev/null
+++ 
b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.util.Set;
+
+import org.apache.nifi.remote.PeerStatus;
+
+public class PeerStatusCache {
+       private final Set<PeerStatus> statuses;
+    private final long timestamp;
+
+    public PeerStatusCache(final Set<PeerStatus> statuses) {
+        this(statuses, System.currentTimeMillis());
+    }
+
+    public PeerStatusCache(final Set<PeerStatus> statuses, final long 
timestamp) {
+        this.statuses = statuses;
+        this.timestamp = timestamp;
+    }
+
+    public Set<PeerStatus> getStatuses() {
+        return statuses;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
 
b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 0fcac8c..00a7687 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -23,7 +23,6 @@ import 
org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
 import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
 import 
org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
 import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.NodeInformant;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.node.Node;
 import org.apache.nifi.cluster.node.Node.Status;
@@ -33,6 +32,7 @@ import org.apache.nifi.cluster.protocol.Heartbeat;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.reporting.BulletinRepository;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 511bb7d..ea523b0 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -65,9 +65,7 @@ import javax.xml.validation.Validator;
 
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.cluster.BulletinsPayload;
-import org.apache.nifi.cluster.ClusterNodeInformation;
 import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.NodeInformation;
 import org.apache.nifi.cluster.context.ClusterContext;
 import org.apache.nifi.cluster.context.ClusterContextImpl;
 import org.apache.nifi.cluster.event.Event;
@@ -155,6 +153,8 @@ import 
org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.remote.RemoteResourceManager;
 import org.apache.nifi.remote.RemoteSiteListener;
 import org.apache.nifi.remote.SocketRemoteSiteListener;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
@@ -191,7 +191,6 @@ import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
 import org.apache.nifi.web.util.WebUtils;
-
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml 
b/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
index f8d8e13..643121e 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
@@ -34,6 +34,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>site-to-site-client</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-runtime</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
deleted file mode 100644
index 0092f7a..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public class AdaptedNodeInformation {
-
-    private String hostname;
-    private Integer siteToSitePort;
-    private int apiPort;
-    private boolean isSiteToSiteSecure;
-    private int totalFlowFiles;
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public void setHostname(String hostname) {
-        this.hostname = hostname;
-    }
-
-    public Integer getSiteToSitePort() {
-        return siteToSitePort;
-    }
-
-    public void setSiteToSitePort(Integer siteToSitePort) {
-        this.siteToSitePort = siteToSitePort;
-    }
-
-    public int getApiPort() {
-        return apiPort;
-    }
-
-    public void setApiPort(int apiPort) {
-        this.apiPort = apiPort;
-    }
-
-    public boolean isSiteToSiteSecure() {
-        return isSiteToSiteSecure;
-    }
-
-    public void setSiteToSiteSecure(boolean isSiteToSiteSecure) {
-        this.isSiteToSiteSecure = isSiteToSiteSecure;
-    }
-
-    public int getTotalFlowFiles() {
-        return totalFlowFiles;
-    }
-
-    public void setTotalFlowFiles(int totalFlowFiles) {
-        this.totalFlowFiles = totalFlowFiles;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
deleted file mode 100644
index 5751c32..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-@XmlRootElement
-public class ClusterNodeInformation {
-
-    private Collection<NodeInformation> nodeInfo;
-
-    private static final JAXBContext JAXB_CONTEXT;
-
-    static {
-        try {
-            JAXB_CONTEXT = 
JAXBContext.newInstance(ClusterNodeInformation.class);
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXBContext.", e);
-        }
-    }
-
-    public ClusterNodeInformation() {
-        this.nodeInfo = null;
-    }
-
-    public void setNodeInformation(final Collection<NodeInformation> nodeInfo) 
{
-        this.nodeInfo = nodeInfo;
-    }
-
-    @XmlJavaTypeAdapter(NodeInformationAdapter.class)
-    public Collection<NodeInformation> getNodeInformation() {
-        return nodeInfo;
-    }
-
-    public void marshal(final OutputStream os) throws JAXBException {
-        final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
-        marshaller.marshal(this, os);
-    }
-
-    public static ClusterNodeInformation unmarshal(final InputStream is) 
throws JAXBException {
-        final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
-        return (ClusterNodeInformation) unmarshaller.unmarshal(is);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
deleted file mode 100644
index 987ff65..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public interface NodeInformant {
-
-    ClusterNodeInformation getNodeInformation();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
deleted file mode 100644
index 848eb7e..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public class NodeInformation {
-
-    private final String hostname;
-    private final Integer siteToSitePort;
-    private final int apiPort;
-    private final boolean isSiteToSiteSecure;
-    private final int totalFlowFiles;
-
-    public NodeInformation(final String hostname, final Integer 
siteToSitePort, final int apiPort,
-            final boolean isSiteToSiteSecure, final int totalFlowFiles) {
-        this.hostname = hostname;
-        this.siteToSitePort = siteToSitePort;
-        this.apiPort = apiPort;
-        this.isSiteToSiteSecure = isSiteToSiteSecure;
-        this.totalFlowFiles = totalFlowFiles;
-    }
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public int getAPIPort() {
-        return apiPort;
-    }
-
-    public Integer getSiteToSitePort() {
-        return siteToSitePort;
-    }
-
-    public boolean isSiteToSiteSecure() {
-        return isSiteToSiteSecure;
-    }
-
-    public int getTotalFlowFiles() {
-        return totalFlowFiles;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof NodeInformation)) {
-            return false;
-        }
-
-        final NodeInformation other = (NodeInformation) obj;
-        if (!hostname.equals(other.hostname)) {
-            return false;
-        }
-        if (siteToSitePort == null && other.siteToSitePort != null) {
-            return false;
-        }
-        if (siteToSitePort != null && other.siteToSitePort == null) {
-            return false;
-        } else if (siteToSitePort != null && siteToSitePort.intValue() != 
other.siteToSitePort.intValue()) {
-            return false;
-        }
-        if (apiPort != other.apiPort) {
-            return false;
-        }
-        if (isSiteToSiteSecure != other.isSiteToSiteSecure) {
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : 
siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
-    }
-
-    @Override
-    public String toString() {
-        return "Node[" + hostname + ":" + apiPort + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
deleted file mode 100644
index 630631f..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-
-public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, 
NodeInformation> {
-
-    @Override
-    public NodeInformation unmarshal(final AdaptedNodeInformation adapted) 
throws Exception {
-        return new NodeInformation(adapted.getHostname(), 
adapted.getSiteToSitePort(), adapted.getApiPort(), 
adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles());
-    }
-
-    @Override
-    public AdaptedNodeInformation marshal(final NodeInformation 
nodeInformation) throws Exception {
-        final AdaptedNodeInformation adapted = new AdaptedNodeInformation();
-        adapted.setHostname(nodeInformation.getHostname());
-        adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort());
-        adapted.setApiPort(nodeInformation.getAPIPort());
-        adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure());
-        adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles());
-        return adapted;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index e0cca64..2e35422 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -27,7 +27,6 @@ import 
org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
 
 public interface RemoteProcessGroup {
 
@@ -211,11 +210,6 @@ public interface RemoteProcessGroup {
      */
     void removeNonExistentPort(final RemoteGroupPort port);
 
-    /**
-     *
-     * @return @throws IOException
-     */
-    CommunicationsSession establishSiteToSiteConnection() throws IOException;
 
     /**
      * Called whenever RemoteProcessGroup is removed from the flow, so that any

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
deleted file mode 100644
index 2422fe1..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-
-public class Peer {
-
-    private final CommunicationsSession commsSession;
-    private final String url;
-    private final String host;
-    private long penalizationExpiration = 0L;
-    private boolean closed = false;
-
-    public Peer(final CommunicationsSession commsSession, final String url) {
-        this.commsSession = commsSession;
-        this.url = url;
-
-        try {
-            this.host = new URI(url).getHost();
-        } catch (final Exception e) {
-            throw new IllegalArgumentException("Invalid URL: " + url);
-        }
-    }
-
-    public String getUrl() {
-        return url;
-    }
-
-    public CommunicationsSession getCommunicationsSession() {
-        return commsSession;
-    }
-
-    public void close() throws IOException {
-        this.closed = true;
-
-        // TODO: Consume the InputStream so that it doesn't linger on the 
Peer's outgoing socket buffer
-        commsSession.close();
-    }
-
-    public void penalize(final long millis) {
-        penalizationExpiration = Math.max(penalizationExpiration, 
System.currentTimeMillis() + millis);
-    }
-
-    public boolean isPenalized() {
-        return penalizationExpiration > System.currentTimeMillis();
-    }
-
-    public boolean isClosed() {
-        return closed;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    @Override
-    public int hashCode() {
-        return 8320 + url.hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-        if (obj == this) {
-            return true;
-        }
-        if (!(obj instanceof Peer)) {
-            return false;
-        }
-
-        final Peer other = (Peer) obj;
-        return this.url.equals(other.url);
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("Peer[url=").append(url);
-        if (closed) {
-            sb.append(",CLOSED");
-        } else if (isPenalized()) {
-            sb.append(",PENALIZED");
-        }
-        sb.append("]");
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
deleted file mode 100644
index d1cb076..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public class PeerStatus {
-
-    private final String hostname;
-    private final int port;
-    private final boolean secure;
-    private final int numFlowFiles;
-
-    public PeerStatus(final String hostname, final int port, final boolean 
secure, final int numFlowFiles) {
-        this.hostname = hostname;
-        this.port = port;
-        this.secure = secure;
-        this.numFlowFiles = numFlowFiles;
-    }
-
-    public String getHostname() {
-        return hostname;
-    }
-
-    public int getPort() {
-        return port;
-    }
-
-    public boolean isSecure() {
-        return secure;
-    }
-
-    public int getFlowFileCount() {
-        return numFlowFiles;
-    }
-
-    @Override
-    public String toString() {
-        return "PeerStatus[hostname=" + hostname + ",port=" + port + 
",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
-    }
-
-    @Override
-    public int hashCode() {
-        return 9824372 + hostname.hashCode() + port;
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null) {
-            return false;
-        }
-
-        if (!(obj instanceof PeerStatus)) {
-            return false;
-        }
-
-        final PeerStatus other = (PeerStatus) obj;
-        return port == other.port && hostname.equals(other.hostname);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
deleted file mode 100644
index 8f2603a..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public interface PortAuthorizationResult {
-
-    boolean isAuthorized();
-
-    String getExplanation();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
deleted file mode 100644
index 12a3d33..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-/**
- *
- */
-public enum RemoteAuthorizationState {
-
-    UNKNOWN,
-    UNAUTHORIZED,
-    AUTHORIZED;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
index d4ad374..f08277c 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -16,20 +16,28 @@
  */
 package org.apache.nifi.remote;
 
+import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.AbstractPort;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 
-public interface RemoteGroupPort extends Port {
+public abstract class RemoteGroupPort extends AbstractPort implements Port, 
RemoteDestination {
 
-    RemoteProcessGroup getRemoteProcessGroup();
+       public RemoteGroupPort(String id, String name, ProcessGroup 
processGroup, ConnectableType type, ProcessScheduler scheduler) {
+               super(id, name, processGroup, type, scheduler);
+       }
 
-    TransferDirection getTransferDirection();
+       public abstract RemoteProcessGroup getRemoteProcessGroup();
 
-    boolean isUseCompression();
+    public abstract TransferDirection getTransferDirection();
 
-    void setUseCompression(boolean useCompression);
+    public abstract boolean isUseCompression();
 
-    boolean getTargetExists();
+    public abstract void setUseCompression(boolean useCompression);
 
-    boolean isTargetRunning();
+    public abstract boolean getTargetExists();
+
+    public abstract boolean isTargetRunning();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
deleted file mode 100644
index 56432d5..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public enum TransferDirection {
-
-    SEND,
-    RECEIVE;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
deleted file mode 100644
index bfccd98..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public interface VersionedRemoteResource {
-
-    VersionNegotiator getVersionNegotiator();
-
-    String getResourceName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
deleted file mode 100644
index b4206b3..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.codec;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-/**
- * <p>
- * Provides a mechanism for encoding and decoding FlowFiles as streams so that
- * they can be transferred remotely.
- * </p>
- */
-public interface FlowFileCodec extends VersionedRemoteResource {
-
-    /**
-     * Returns a List of all versions that this codec is able to support, in 
the
-     * order that they are preferred by the codec
-     *
-     * @return
-     */
-    public List<Integer> getSupportedVersions();
-
-    /**
-     * Encodes a FlowFile and its content as a single stream of data and writes
-     * that stream to the output. If checksum is not null, it will be 
calculated
-     * as the stream is read
-     *
-     * @param flowFile the FlowFile to encode
-     * @param session a session that can be used to transactionally create and
-     * transfer flow files
-     * @param outStream the stream to write the data to
-     *
-     * @return the updated FlowFile
-     *
-     * @throws IOException
-     */
-    FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream 
outStream) throws IOException, TransmissionDisabledException;
-
-    /**
-     * Decodes the contents of the InputStream, interpreting the data to
-     * determine the next FlowFile's attributes and content, as well as their
-     * destinations. If not null, checksum will be used to calculate the
-     * checksum as the data is read.
-     *
-     * @param stream an InputStream containing FlowFiles' contents, attributes,
-     * and destinations
-     * @param session
-     *
-     * @return the FlowFile that was created, or <code>null</code> if the 
stream
-     * was out of data
-     *
-     * @throws IOException
-     * @throws ProtocolException if the input is malformed
-     */
-    FlowFile decode(InputStream stream, ProcessSession session) throws 
IOException, ProtocolException, TransmissionDisabledException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
deleted file mode 100644
index f6c2f4f..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class BadRequestException extends Exception {
-
-    private static final long serialVersionUID = -8034602852256106560L;
-
-    public BadRequestException(final String message) {
-        super(message);
-    }
-
-    public BadRequestException(final Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
deleted file mode 100644
index b61fc65..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class HandshakeException extends Exception {
-
-    private static final long serialVersionUID = 178192341908726L;
-
-    public HandshakeException(final String message) {
-        super(message);
-    }
-
-    public HandshakeException(final Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
deleted file mode 100644
index 24ff3a5..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class NotAuthorizedException extends Exception {
-
-    private static final long serialVersionUID = 2952623568114035498L;
-
-    public NotAuthorizedException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
deleted file mode 100644
index af0f467..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class PortNotRunningException extends Exception {
-
-    private static final long serialVersionUID = -2790940982005516375L;
-
-    public PortNotRunningException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
deleted file mode 100644
index 0f50b98..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class ProtocolException extends Exception {
-
-    private static final long serialVersionUID = 5763900324505818495L;
-
-    public ProtocolException(final String message, final Throwable cause) {
-        super(message, cause);
-    }
-
-    public ProtocolException(final String message) {
-        super(message);
-    }
-
-    public ProtocolException(final Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
deleted file mode 100644
index dd675b3..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-/**
- * Used to indicate that by the time the request was serviced, it had already
- * expired
- */
-public class RequestExpiredException extends Exception {
-
-    private static final long serialVersionUID = -7037025330562827852L;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
deleted file mode 100644
index e6a0fe7..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class UnknownPortException extends Exception {
-
-    private static final long serialVersionUID = -2790940982005516375L;
-
-    public UnknownPortException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
deleted file mode 100644
index 32274eb..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-
-public interface ClientProtocol extends VersionedRemoteResource {
-
-    void handshake(Peer peer) throws IOException, HandshakeException, 
UnknownPortException, PortNotRunningException;
-
-    Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, 
ProtocolException;
-
-    FlowFileCodec negotiateCodec(Peer peer) throws IOException, 
ProtocolException;
-
-    void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
-
-    void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
-
-    void shutdown(Peer peer) throws IOException, ProtocolException;
-
-    boolean isReadyForFileTransfer();
-
-    /**
-     * returns <code>true</code> if remote instance indicates that the port is
-     * invalid
-     *
-     * @return
-     * @throws IllegalStateException if a handshake has not successfully
-     * completed
-     */
-    boolean isPortInvalid() throws IllegalStateException;
-
-    /**
-     * returns <code>true</code> if remote instance indicates that the port is
-     * unknown
-     *
-     * @return
-     * @throws IllegalStateException if a handshake has not successfully
-     * completed
-     */
-    boolean isPortUnknown();
-
-    /**
-     * returns <code>true</code> if remote instance indicates that the port's
-     * destination is full
-     *
-     * @return
-     * @throws IllegalStateException if a handshake has not successfully
-     * completed
-     */
-    boolean isDestinationFull();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
deleted file mode 100644
index d2e2946..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public interface CommunicationsInput {
-
-    InputStream getInputStream() throws IOException;
-
-    long getBytesRead();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
deleted file mode 100644
index 95cab29..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public interface CommunicationsOutput {
-
-    OutputStream getOutputStream() throws IOException;
-
-    long getBytesWritten();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
deleted file mode 100644
index d009cec..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface CommunicationsSession extends Closeable {
-
-    public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 
'F', (byte) 'i'};
-
-    CommunicationsInput getInput();
-
-    CommunicationsOutput getOutput();
-
-    void setTimeout(int millis) throws IOException;
-
-    int getTimeout() throws IOException;
-
-    void setUri(String uri);
-
-    String getUri();
-
-    String getUserDn();
-
-    void setUserDn(String dn);
-
-    boolean isDataAvailable();
-
-    long getBytesWritten();
-
-    long getBytesRead();
-
-    /**
-     * Asynchronously interrupts this FlowFileCodec. Implementations must 
ensure
-     * that they stop sending and receiving data as soon as possible after this
-     * method has been called, even if doing so results in sending only partial
-     * data to the peer. This will usually result in the peer throwing a
-     * SocketTimeoutException.
-     */
-    void interrupt();
-
-    /**
-     * Returns <code>true</code> if the connection is closed, 
<code>false</code>
-     * otherwise.
-     *
-     * @return
-     */
-    boolean isClosed();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
deleted file mode 100644
index 41334fe..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public enum RequestType {
-
-    NEGOTIATE_FLOWFILE_CODEC,
-    REQUEST_PEER_LIST,
-    SEND_FLOWFILES,
-    RECEIVE_FLOWFILES,
-    SHUTDOWN;
-
-    public void writeRequestType(final DataOutputStream dos) throws 
IOException {
-        dos.writeUTF(name());
-    }
-
-    public static RequestType readRequestType(final DataInputStream dis) 
throws IOException {
-        final String requestTypeVal = dis.readUTF();
-        try {
-            return RequestType.valueOf(requestTypeVal);
-        } catch (final Exception e) {
-            throw new IOException("Could not determine RequestType: received 
invalid value " + requestTypeVal);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
index 0d18f2e..0118534 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -18,13 +18,13 @@ package org.apache.nifi.remote.protocol;
 
 import java.io.IOException;
 
-import org.apache.nifi.cluster.NodeInformant;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.BadRequestException;
 import org.apache.nifi.remote.exception.HandshakeException;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 2c1b085..bfa3d25 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -18,21 +18,10 @@ package org.apache.nifi.remote;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -49,11 +38,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
 import javax.ws.rs.core.Response;
 
 import org.apache.nifi.connectable.ConnectableType;
@@ -72,16 +58,7 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.remote.exception.BadRequestException;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import 
org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
@@ -156,9 +133,7 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     private volatile String authorizationIssue;
 
-    private volatile PeerStatusCache peerStatusCache;
-    private static final long PEER_CACHE_MILLIS = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
-
+    private final EndpointConnectionStatePool endpointConnectionPool;
     private final ScheduledExecutorService backgroundThreadExecutor;
 
     public StandardRemoteProcessGroup(final String id, final String targetUri, 
final ProcessGroup processGroup,
@@ -218,45 +193,23 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
             }
         };
 
-        try {
-            final File peersFile = getPeerPersistenceFile();
-            this.peerStatusCache = new 
PeerStatusCache(recoverPersistedPeerStatuses(peersFile), 
peersFile.lastModified());
-        } catch (final IOException e) {
-            logger.error("{} Failed to recover persisted Peer Statuses due to 
{}", this, e);
-        }
+        endpointConnectionPool = new EndpointConnectionStatePool(sslContext, 
eventReporter, getPeerPersistenceFile());
 
         final Runnable refreshPeers = new Runnable() {
             @Override
             public void run() {
-                final PeerStatusCache existingCache = peerStatusCache;
-                if (existingCache != null && (existingCache.getTimestamp() + 
PEER_CACHE_MILLIS > System.currentTimeMillis())) {
-                    return;
-                }
-
-                Set<RemoteGroupPort> ports = getInputPorts();
-                if (ports.isEmpty()) {
-                    ports = getOutputPorts();
-                }
-                
-                if (ports.isEmpty()){
-                    return;
-                }
-
-                // it doesn't really matter which port we use. Since we are 
just getting the Peer Status,
-                // if the server indicates that the port cannot receive data 
for whatever reason, we will
-                // simply ignore the error.
-                final RemoteGroupPort port = ports.iterator().next();
-
-                try {
-                    final Set<PeerStatus> statuses = 
fetchRemotePeerStatuses(port);
-                    peerStatusCache = new PeerStatusCache(statuses);
-                    logger.info("{} Successfully refreshed Peer Status; remote 
instance consists of {} peers", StandardRemoteProcessGroup.this, 
statuses.size());
-                } catch (Exception e) {
-                    logger.warn("{} Unable to refresh Remote Group's peers due 
to {}", StandardRemoteProcessGroup.this, e);
-                    if (logger.isDebugEnabled()) {
-                        logger.warn("", e);
-                    }
-                }
+               final boolean secure;
+               try {
+                       secure = isSecure();
+                               } catch (CommunicationsException e) {
+                                       logger.warn("{} Unable to determine if 
remote instance {} is configured for secure site-to-site due to {}; will not 
refresh list of peers", new Object[] {this, getTargetUri(), e.toString()});
+                                       if ( logger.isDebugEnabled() ) {
+                                               logger.warn("", e);
+                                       }
+                                       return;
+                               }
+               
+               endpointConnectionPool.refreshPeers(getTargetUri(), secure);
             }
         };
 
@@ -1255,52 +1208,6 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
         }
     }
 
-    @Override
-    public CommunicationsSession establishSiteToSiteConnection() throws 
IOException {
-        final URI uri = apiUri;
-        final String destinationUri = uri.toString();
-        CommunicationsSession commsSession = null;
-        try {
-            if (isSecure()) {
-                if (sslContext == null) {
-                    throw new IOException("Unable to communicate with " + 
getTargetUri() + " because it requires Secure Site-to-Site communications, but 
this instance is not configured for secure communications");
-                }
-
-                final Integer listeningPort = getListeningPort();
-                if (listeningPort == null) {
-                    throw new IOException("Remote instance is not configured 
to allow incoming Site-to-Site connections");
-                }
-
-                final SSLSocketChannel socketChannel = new 
SSLSocketChannel(sslContext, uri.getHost(), listeningPort, true);
-                socketChannel.connect();
-                commsSession = new 
SSLSocketChannelCommunicationsSession(socketChannel, destinationUri);
-
-                try {
-                    commsSession.setUserDn(socketChannel.getDn());
-                } catch (final CertificateNotYetValidException | 
CertificateExpiredException ex) {
-                    throw new IOException(ex);
-                }
-            } else {
-                final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress(uri.getHost(), getListeningPort()));
-
-                commsSession = new 
SocketChannelCommunicationsSession(socketChannel, destinationUri);
-            }
-
-            
commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES);
-
-            commsSession.setUri("nifi://" + uri.getHost() + ":" + 
uri.getPort());
-        } catch (final IOException e) {
-            if (commsSession != null) {
-                try {
-                    commsSession.close();
-                } catch (final IOException ignore) {
-                }
-            }
-
-            throw e;
-        }
-        return commsSession;
-    }
 
     @Override
     public EventReporter getEventReporter() {
@@ -1489,133 +1396,13 @@ public class StandardRemoteProcessGroup implements 
RemoteProcessGroup {
 
     @Override
     public Set<PeerStatus> getPeerStatuses() {
-        final PeerStatusCache cache = this.peerStatusCache;
-        if (cache == null || cache.getStatuses() == null || 
cache.getStatuses().isEmpty()) {
-            return null;
-        }
-
-        if (cache.getTimestamp() + PEER_CACHE_MILLIS < 
System.currentTimeMillis()) {
-            final Set<PeerStatus> equalizedSet = new 
HashSet<>(cache.getStatuses().size());
-            for (final PeerStatus status : cache.getStatuses()) {
-                final PeerStatus equalizedStatus = new 
PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1);
-                equalizedSet.add(equalizedStatus);
-            }
-
-            return equalizedSet;
-        }
-
-        return cache.getStatuses();
+       return endpointConnectionPool.getPeerStatuses();
     }
 
-    private Set<PeerStatus> fetchRemotePeerStatuses(final RemoteGroupPort 
port) throws IOException, HandshakeException, UnknownPortException, 
PortNotRunningException, BadRequestException {
-        final CommunicationsSession commsSession = 
establishSiteToSiteConnection();
-        final Peer peer = new Peer(commsSession, "nifi://" + 
getTargetUri().getHost() + ":" + getListeningPort());
-        final SocketClientProtocol clientProtocol = new SocketClientProtocol();
-        clientProtocol.setPort(port);
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        try {
-            RemoteResourceFactory.initiateResourceNegotiation(clientProtocol, 
dis, dos);
-        } catch (final HandshakeException e) {
-            throw new BadRequestException(e.toString());
-        }
-
-        clientProtocol.handshake(peer);
-        final Set<PeerStatus> peerStatuses = 
clientProtocol.getPeerStatuses(peer);
-        persistPeerStatuses(peerStatuses);
-
-        try {
-            clientProtocol.shutdown(peer);
-        } catch (final IOException e) {
-            final String message = String.format("%s Failed to shutdown 
protocol when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-            getEventReporter().reportEvent(Severity.WARNING, "Site to Site", 
message);
-        }
-
-        try {
-            peer.close();
-        } catch (final IOException e) {
-            final String message = String.format("%s Failed to close resources 
when updating list of peers due to %s", this, e.toString());
-            logger.warn(message);
-            if (logger.isDebugEnabled()) {
-                logger.warn("", e);
-            }
-            getEventReporter().reportEvent(Severity.WARNING, "Site to Site", 
message);
-        }
-
-        return peerStatuses;
-    }
 
     private File getPeerPersistenceFile() {
         final File stateDir = 
NiFiProperties.getInstance().getPersistentStateDirectory();
         return new File(stateDir, getIdentifier() + ".peers");
     }
 
-    private void persistPeerStatuses(final Set<PeerStatus> statuses) {
-        final File peersFile = getPeerPersistenceFile();
-        try (final OutputStream fos = new FileOutputStream(peersFile);
-                final OutputStream out = new BufferedOutputStream(fos)) {
-
-            for (final PeerStatus status : statuses) {
-                final String line = status.getHostname() + ":" + 
status.getPort() + ":" + status.isSecure() + "\n";
-                out.write(line.getBytes(StandardCharsets.UTF_8));
-            }
-
-        } catch (final IOException e) {
-            logger.error("Failed to persist list of Peers due to {}; if 
restarted and peer's NCM is down, may be unable to transfer data until 
communications with NCM are restored", e.toString(), e);
-        }
-    }
-
-    private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) 
throws IOException {
-        if (!file.exists()) {
-            return null;
-        }
-
-        final Set<PeerStatus> statuses = new HashSet<>();
-        try (final InputStream fis = new FileInputStream(file);
-                final BufferedReader reader = new BufferedReader(new 
InputStreamReader(fis))) {
-
-            String line;
-            while ((line = reader.readLine()) != null) {
-                final String[] splits = line.split(Pattern.quote(":"));
-                if (splits.length != 3) {
-                    continue;
-                }
-
-                final String hostname = splits[0];
-                final int port = Integer.parseInt(splits[1]);
-                final boolean secure = Boolean.parseBoolean(splits[2]);
-
-                statuses.add(new PeerStatus(hostname, port, secure, 1));
-            }
-        }
-
-        return statuses;
-    }
-
-    private static class PeerStatusCache {
-
-        private final Set<PeerStatus> statuses;
-        private final long timestamp;
-
-        public PeerStatusCache(final Set<PeerStatus> statuses) {
-            this(statuses, System.currentTimeMillis());
-        }
-
-        public PeerStatusCache(final Set<PeerStatus> statuses, final long 
timestamp) {
-            this.statuses = statuses;
-            this.timestamp = timestamp;
-        }
-
-        public Set<PeerStatus> getStatuses() {
-            return statuses;
-        }
-
-        public long getTimestamp() {
-            return timestamp;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml
index 1c8d2c0..a7909a3 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml
@@ -62,6 +62,10 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>site-to-site-client</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
deleted file mode 100644
index 4babb92..0000000
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-
-public abstract class AbstractCommunicationsSession implements 
CommunicationsSession {
-    private String userDn;
-    
-    private volatile String uri;
-    
-    public AbstractCommunicationsSession(final String uri) {
-        this.uri = uri;
-    }
-    
-    @Override
-    public String toString() {
-        return uri;
-    }
-
-    @Override
-    public void setUri(final String uri) {
-        this.uri = uri;
-    }
-
-    @Override
-    public String getUri() {
-        return uri;
-    }
-
-    @Override
-    public String getUserDn() {
-        return userDn;
-    }
-    
-    @Override
-    public void setUserDn(final String dn) {
-        this.userDn = dn;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 49d3c3c..922d4e7 100644
--- 
a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ 
b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -26,48 +26,8 @@ import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.protocol.ClientProtocol;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 
-public class RemoteResourceFactory {
+public class RemoteResourceFactory extends RemoteResourceInitiator {
 
-       public static final int RESOURCE_OK = 20;
-       public static final int DIFFERENT_RESOURCE_VERSION = 21;
-       public static final int ABORT = 255;
-
-       
-       public static VersionedRemoteResource initiateResourceNegotiation(final 
VersionedRemoteResource resource, final DataInputStream dis, final 
DataOutputStream dos) throws IOException, HandshakeException {
-        // Write the classname of the RemoteStreamCodec, followed by its 
version
-       dos.writeUTF(resource.getResourceName());
-       final VersionNegotiator negotiator = resource.getVersionNegotiator();
-       dos.writeInt(negotiator.getVersion());
-       dos.flush();
-        
-        // wait for response from server.
-        final int statusCode = dis.read();
-        switch (statusCode) {
-            case RESOURCE_OK:  // server accepted our proposal of codec 
name/version
-                return resource;
-            case DIFFERENT_RESOURCE_VERSION:   // server accepted our proposal 
of codec name but not the version
-                // Get server's preferred version
-               final int newVersion = dis.readInt();
-                
-                // Determine our new preferred version that is no greater than 
the server's preferred version.
-                final Integer newPreference = 
negotiator.getPreferredVersion(newVersion);
-                // If we could not agree with server on a version, fail now.
-                if ( newPreference == null ) {
-                    throw new HandshakeException("Could not agree on version 
for " + resource);
-                }
-                
-                negotiator.setVersion(newPreference);
-                
-                // Attempt negotiation of resource based on our new preferred 
version.
-                return initiateResourceNegotiation(resource, dis, dos);
-            case ABORT:
-               throw new HandshakeException("Remote destination aborted 
connection with message: " + dis.readUTF());
-            default:
-                return null;   // Unable to negotiate codec
-        }
-       }
-
-       
        @SuppressWarnings("unchecked")
     public static <T extends FlowFileCodec> T receiveCodecNegotiation(final 
DataInputStream dis, final DataOutputStream dos) throws IOException, 
HandshakeException {
         final String codecName = dis.readUTF();

Reply via email to