http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
new file mode 100644
index 0000000..dc3d68f
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+public class SSLSocketChannelOutput implements CommunicationsOutput {
+    private final OutputStream out;
+    private final ByteCountingOutputStream countingOut;
+    
+    public SSLSocketChannelOutput(final SSLSocketChannel channel) {
+        countingOut = new ByteCountingOutputStream(new 
SSLSocketChannelOutputStream(channel));
+        out = new BufferedOutputStream(countingOut);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+        return out;
+    }
+    
+    @Override
+    public long getBytesWritten() {
+        return countingOut.getBytesWritten();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
new file mode 100644
index 0000000..befbdaa
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+
+public interface ClientProtocol extends VersionedRemoteResource {
+
+    void handshake(Peer peer) throws IOException, HandshakeException, 
UnknownPortException, PortNotRunningException;
+
+    Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, 
ProtocolException;
+
+    FlowFileCodec negotiateCodec(Peer peer) throws IOException, 
ProtocolException;
+
+    void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession 
session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+    void shutdown(Peer peer) throws IOException, ProtocolException;
+
+    boolean isReadyForFileTransfer();
+
+    
+    
+    
+    Transaction startTransaction(Peer peer, FlowFileCodec codec, 
TransferDirection direction) throws IOException;
+    
+    
+    /**
+     * returns <code>true</code> if remote instance indicates that the port is
+     * invalid
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isPortInvalid() throws IllegalStateException;
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port is
+     * unknown
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isPortUnknown();
+
+    /**
+     * returns <code>true</code> if remote instance indicates that the port's
+     * destination is full
+     *
+     * @return
+     * @throws IllegalStateException if a handshake has not successfully
+     * completed
+     */
+    boolean isDestinationFull();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
new file mode 100644
index 0000000..d2e2946
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface CommunicationsInput {
+
+    InputStream getInputStream() throws IOException;
+
+    long getBytesRead();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
new file mode 100644
index 0000000..95cab29
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CommunicationsOutput {
+
+    OutputStream getOutputStream() throws IOException;
+
+    long getBytesWritten();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
new file mode 100644
index 0000000..d009cec
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface CommunicationsSession extends Closeable {
+
+    public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 
'F', (byte) 'i'};
+
+    CommunicationsInput getInput();
+
+    CommunicationsOutput getOutput();
+
+    void setTimeout(int millis) throws IOException;
+
+    int getTimeout() throws IOException;
+
+    void setUri(String uri);
+
+    String getUri();
+
+    String getUserDn();
+
+    void setUserDn(String dn);
+
+    boolean isDataAvailable();
+
+    long getBytesWritten();
+
+    long getBytesRead();
+
+    /**
+     * Asynchronously interrupts this FlowFileCodec. Implementations must 
ensure
+     * that they stop sending and receiving data as soon as possible after this
+     * method has been called, even if doing so results in sending only partial
+     * data to the peer. This will usually result in the peer throwing a
+     * SocketTimeoutException.
+     */
+    void interrupt();
+
+    /**
+     * Returns <code>true</code> if the connection is closed, 
<code>false</code>
+     * otherwise.
+     *
+     * @return
+     */
+    boolean isClosed();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
new file mode 100644
index 0000000..f4fa4d0
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.InputStream;
+import java.util.Map;
+
+public interface DataPacket {
+
+       Map<String, String> getAttributes();
+       
+       InputStream getData();
+       
+       long getSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
new file mode 100644
index 0000000..41334fe
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public enum RequestType {
+
+    NEGOTIATE_FLOWFILE_CODEC,
+    REQUEST_PEER_LIST,
+    SEND_FLOWFILES,
+    RECEIVE_FLOWFILES,
+    SHUTDOWN;
+
+    public void writeRequestType(final DataOutputStream dos) throws 
IOException {
+        dos.writeUTF(name());
+    }
+
+    public static RequestType readRequestType(final DataInputStream dis) 
throws IOException {
+        final String requestTypeVal = dis.readUTF();
+        try {
+            return RequestType.valueOf(requestTypeVal);
+        } catch (final Exception e) {
+            throw new IOException("Could not determine RequestType: received 
invalid value " + requestTypeVal);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
new file mode 100644
index 0000000..41dc276
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket 
Protocol.
+ */
+public enum HandshakeProperty {
+    /**
+     * Boolean value indicating whether or not the contents of a FlowFile 
should be
+     * GZipped when transferred.
+     */
+    GZIP,
+    
+    /**
+     * The unique identifier of the port to communicate with
+     */
+    PORT_IDENTIFIER,
+    
+    /**
+     * Indicates the number of milliseconds after the request was made that 
the client
+     * will wait for a response. If no response has been received by the time 
this value
+     * expires, the server can move on without attempting to service the 
request because
+     * the client will have already disconnected.
+     */
+    REQUEST_EXPIRATION_MILLIS,
+    
+    /**
+     * The preferred number of FlowFiles that the server should send to the 
client
+     * when pulling data. This property was introduced in version 5 of the 
protocol.
+     */
+    BATCH_COUNT,
+    
+    /**
+     * The preferred number of bytes that the server should send to the client 
when
+     * pulling data. This property was introduced in version 5 of the protocol.
+     */
+    BATCH_SIZE,
+    
+    /**
+     * The preferred amount of time that the server should send data to the 
client
+     * when pulling data. This property was introduced in version 5 of the 
protocol.
+     * Value is in milliseconds.
+     */
+    BATCH_DURATION;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
new file mode 100644
index 0000000..eae1940
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public class Response {
+    private final ResponseCode code;
+    private final String message;
+    
+    private Response(final ResponseCode code, final String explanation) {
+        this.code = code;
+        this.message = explanation;
+    }
+    
+    public ResponseCode getCode() {
+        return code;
+    }
+    
+    public String getMessage() {
+        return message;
+    }
+    
+    public static Response read(final DataInputStream in) throws IOException, 
ProtocolException {
+        final ResponseCode code = ResponseCode.readCode(in);
+        final String message = code.containsMessage() ? in.readUTF() : null;
+        return new Response(code, message);
+    }
+    
+    @Override
+    public String toString() {
+        return code + ": " + message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
new file mode 100644
index 0000000..8860e73
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+
+public enum ResponseCode {
+    RESERVED(0, "Reserved for Future Use", false), // This will likely be used 
if we ever need to expand the length of
+                                            // ResponseCode, so that we can 
indicate a 0 followed by some other bytes
+    
+    // handshaking properties
+    PROPERTIES_OK(1, "Properties OK", false),
+    UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
+    ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
+    MISSING_PROPERTY(232, "Missing Property", true),
+    
+    // transaction indicators
+    CONTINUE_TRANSACTION(10, "Continue Transaction", false),
+    FINISH_TRANSACTION(11, "Finish Transaction", false),
+    CONFIRM_TRANSACTION(12, "Confirm Transaction", true),   // "Explanation" 
of this code is the checksum
+    TRANSACTION_FINISHED(13, "Transaction Finished", false),
+    TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But 
Destination is Full", false),
+    CANCEL_TRANSACTION(15, "Cancel Transaction", true),
+    BAD_CHECKSUM(19, "Bad Checksum", false),
+
+    // data availability indicators
+    MORE_DATA(20, "More Data Exists", false),
+    NO_MORE_DATA(21, "No More Data Exists", false),
+    
+    // port state indicators
+    UNKNOWN_PORT(200, "Unknown Port", false),
+    PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
+    PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
+    
+    // authorization
+    UNAUTHORIZED(240, "User Not Authorized", true),
+    
+    // error indicators
+    ABORT(250, "Abort", true),
+    UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
+    END_OF_STREAM(255, "End of Stream", false);
+    
+    private static final ResponseCode[] codeArray = new ResponseCode[256];
+    
+    static {
+        for ( final ResponseCode responseCode : ResponseCode.values() ) {
+            codeArray[responseCode.getCode()] = responseCode;
+        }
+    }
+    
+    private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
+    private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
+    private final int code;
+    private final byte[] codeSequence;
+    private final String description;
+    private final boolean containsMessage;
+    
+    private ResponseCode(final int code, final String description, final 
boolean containsMessage) {
+        this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, (byte) code};
+        this.code = code;
+        this.description = description;
+        this.containsMessage = containsMessage;
+    }
+    
+    public int getCode() {
+        return code;
+    }
+    
+    public byte[] getCodeSequence() {
+        return codeSequence;
+    }
+    
+    @Override
+    public String toString() {
+        return description;
+    }
+    
+    public boolean containsMessage() {
+        return containsMessage;
+    }
+    
+    public void writeResponse(final DataOutputStream out) throws IOException {
+        if ( containsMessage() ) {
+            throw new IllegalArgumentException("ResponseCode " + code + " 
expects an explanation");
+        }
+        
+        out.write(getCodeSequence());
+        out.flush();
+    }
+    
+    public void writeResponse(final DataOutputStream out, final String 
explanation) throws IOException {
+        if ( !containsMessage() ) {
+            throw new IllegalArgumentException("ResponseCode " + code + " does 
not expect an explanation");
+        }
+        
+        out.write(getCodeSequence());
+        out.writeUTF(explanation);
+        out.flush();
+    }
+    
+    static ResponseCode readCode(final InputStream in) throws IOException, 
ProtocolException {
+        final int byte1 = in.read();
+        if ( byte1 < 0 ) {
+            throw new EOFException();
+        } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
+            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
+        }
+        
+        final int byte2 = in.read();
+        if ( byte2 < 0 ) {
+            throw new EOFException();
+        } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
+            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
+        }
+
+        final int byte3 = in.read();
+        if ( byte3 < 0 ) {
+            throw new EOFException();
+        }
+        
+        final ResponseCode responseCode = codeArray[byte3];
+        if (responseCode == null) {
+            throw new ProtocolException("Received Response Code of " + byte3 + 
" but do not recognize this code");
+        }
+        return responseCode;
+    }
+    
+    public static ResponseCode fromSequence(final byte[] value) {
+        final int code = value[3] & 0xFF;
+        final ResponseCode responseCode = codeArray[code];
+        return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : 
responseCode;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
new file mode 100644
index 0000000..5f194f8
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.ClientProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientProtocol implements ClientProtocol {
+    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
+
+    private RemoteDestination destination;
+    private boolean useCompression = false;
+    
+    private String commsIdentifier;
+    private boolean handshakeComplete = false;
+    
+    private final Logger logger = 
LoggerFactory.getLogger(SocketClientProtocol.class);
+    
+    private Response handshakeResponse = null;
+    private boolean readyForFileTransfer = false;
+    private String transitUriPrefix = null;
+    private int timeoutMillis = 30000;
+    
+    private int batchCount;
+    private long batchSize;
+    private long batchMillis;
+
+    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
+    
+    public SocketClientProtocol() {
+    }
+
+    public void setPreferredBatchCount(final int count) {
+        this.batchCount = count;
+    }
+    
+    public void setPreferredBatchSize(final long bytes) {
+        this.batchSize = bytes;
+    }
+    
+    public void setPreferredBatchDuration(final long millis) {
+        this.batchMillis = millis;
+    }
+    
+    public void setDestination(final RemoteDestination destination) {
+        this.destination = destination;
+        this.useCompression = destination.isUseCompression();
+    }
+    
+    public void setTimeout(final int timeoutMillis) {
+       this.timeoutMillis = timeoutMillis;
+    }
+    
+    @Override
+    public void handshake(final Peer peer) throws IOException, 
HandshakeException {
+       handshake(peer, destination.getIdentifier());
+    }
+    
+    public void handshake(final Peer peer, final String destinationId) throws 
IOException, HandshakeException {
+        if ( handshakeComplete ) {
+            throw new IllegalStateException("Handshake has already been 
completed");
+        }
+        commsIdentifier = UUID.randomUUID().toString();
+        logger.debug("{} handshaking with {}", this, peer);
+        
+        final Map<HandshakeProperty, String> properties = new HashMap<>();
+        properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
+        
+        if ( destinationId != null ) {
+               properties.put(HandshakeProperty.PORT_IDENTIFIER, 
destination.getIdentifier());
+        }
+        
+        properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, 
String.valueOf(timeoutMillis) );
+        
+        if ( versionNegotiator.getVersion() >= 5 ) {
+            if ( batchCount > 0 ) {
+                properties.put(HandshakeProperty.BATCH_COUNT, 
String.valueOf(batchCount));
+            }
+            if ( batchSize > 0L ) {
+                properties.put(HandshakeProperty.BATCH_SIZE, 
String.valueOf(batchSize));
+            }
+            if ( batchMillis > 0L ) {
+                properties.put(HandshakeProperty.BATCH_DURATION, 
String.valueOf(batchMillis));
+            }
+        }
+        
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        commsSession.setTimeout(timeoutMillis);
+        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+        
+        dos.writeUTF(commsIdentifier);
+        
+        if ( versionNegotiator.getVersion() >= 3 ) {
+            dos.writeUTF(peer.getUrl());
+            transitUriPrefix = peer.getUrl();
+            
+            if ( !transitUriPrefix.endsWith("/") ) {
+                transitUriPrefix = transitUriPrefix + "/";
+            }
+        }
+        
+        dos.writeInt(properties.size());
+        for ( final Map.Entry<HandshakeProperty, String> entry : 
properties.entrySet() ) {
+            dos.writeUTF(entry.getKey().name());
+            dos.writeUTF(entry.getValue());
+        }
+        
+        dos.flush();
+        
+        try {
+            handshakeResponse = Response.read(dis);
+        } catch (final ProtocolException e) {
+            throw new HandshakeException(e);
+        }
+        
+        switch (handshakeResponse.getCode()) {
+            case PORT_NOT_IN_VALID_STATE:
+            case UNKNOWN_PORT:
+            case PORTS_DESTINATION_FULL:
+                break;
+            case PROPERTIES_OK:
+                readyForFileTransfer = true;
+                break;
+            default:
+                logger.error("{} received unexpected response {} from {} when 
negotiating Codec", new Object[] {
+                    this, handshakeResponse, peer});
+                peer.close();
+                throw new HandshakeException("Received unexpected response " + 
handshakeResponse);
+        }
+        
+        logger.debug("{} Finished handshake with {}", this, peer);
+        handshakeComplete = true;
+    }
+    
+    public boolean isReadyForFileTransfer() {
+        return readyForFileTransfer;
+    }
+    
+    public boolean isPortInvalid() {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not completed 
successfully");
+        }
+        return handshakeResponse.getCode() == 
ResponseCode.PORT_NOT_IN_VALID_STATE;
+    }
+    
+    public boolean isPortUnknown() {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not completed 
successfully");
+        }
+        return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
+    }
+    
+    public boolean isDestinationFull() {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not completed 
successfully");
+        }
+        return handshakeResponse.getCode() == 
ResponseCode.PORTS_DESTINATION_FULL;
+    }
+    
+    @Override
+    public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException 
{
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been 
performed");
+        }
+        
+        logger.debug("{} Get Peer Statuses from {}", this, peer);
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+        
+        RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
+        dos.flush();
+        final int numPeers = dis.readInt();
+        final Set<PeerStatus> peers = new HashSet<>(numPeers);
+        for (int i=0; i < numPeers; i++) {
+            final String hostname = dis.readUTF();
+            final int port = dis.readInt();
+            final boolean secure = dis.readBoolean();
+            final int flowFileCount = dis.readInt();
+            peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+        }
+        
+        logger.debug("{} Received {} Peer Statuses from {}", this, 
peers.size(), peer);
+        return peers;
+    }
+    
+    @Override
+    public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, 
ProtocolException {
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been 
performed");
+        }
+
+        logger.debug("{} Negotiating Codec with {}", this, peer);
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
+        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+
+        RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
+        
+        FlowFileCodec codec = new StandardFlowFileCodec();
+        try {
+            codec = (FlowFileCodec) 
RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos);
+        } catch (HandshakeException e) {
+            throw new ProtocolException(e.toString());
+        }
+        logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] 
{this, codec, commsSession});
+
+        return codec;
+    }
+
+
+    @Override
+    public Transaction startTransaction(final Peer peer, final FlowFileCodec 
codec, final TransferDirection direction) throws IOException, ProtocolException 
{
+        if ( !handshakeComplete ) {
+            throw new IllegalStateException("Handshake has not been 
performed");
+        }
+        if ( !readyForFileTransfer ) {
+            throw new IllegalStateException("Cannot start transaction; 
handshake resolution was " + handshakeResponse);
+        }
+        
+        return new SocketClientTransaction(versionNegotiator.getVersion(), 
peer, codec, 
+                       direction, useCompression, (int) 
destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+    }
+
+
+    @Override
+    public void receiveFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
+       final String userDn = peer.getCommunicationsSession().getUserDn();
+       final Transaction transaction = startTransaction(peer, codec, 
TransferDirection.RECEIVE);
+       
+       final StopWatch stopWatch = new StopWatch(true);
+       final Set<FlowFile> flowFilesReceived = new HashSet<>();
+       long bytesReceived = 0L;
+       
+       while (true) {
+               final long start = System.nanoTime();
+               final DataPacket dataPacket = transaction.receive();
+               if ( dataPacket == null ) {
+                   if ( flowFilesReceived.isEmpty() ) {
+                       
peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+                   }
+                       break;
+               }
+               
+               FlowFile flowFile = session.create();
+               flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
+               flowFile = session.importFrom(dataPacket.getData(), flowFile);
+               final long receiveNanos = System.nanoTime() - start;
+               
+                       String sourceFlowFileIdentifier = 
dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+                       if ( sourceFlowFileIdentifier == null ) {
+                               sourceFlowFileIdentifier = "<Unknown 
Identifier>";
+                       }
+                       
+                       final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+                       session.getProvenanceReporter().receive(flowFile, 
transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + 
peer.getHost() + ", Remote DN=" + userDn, 
TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+               session.transfer(flowFile, Relationship.ANONYMOUS);
+               bytesReceived += dataPacket.getSize();
+       }
+
+       // Confirm that what we received was the correct data.
+       transaction.confirm();
+       
+               // Commit the session so that we have persisted the data
+               session.commit();
+
+               // We want to apply backpressure if the outgoing connections 
are full. I.e., there are no available relationships.
+               final boolean applyBackpressure = 
context.getAvailableRelationships().isEmpty();
+
+               transaction.complete(applyBackpressure);
+               logger.debug("{} Sending 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+
+               if ( flowFilesReceived.isEmpty() ) {
+                   return;
+               }
+               
+               stopWatch.stop();
+               final String flowFileDescription = flowFilesReceived.size() < 
20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+               final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
+               final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+               final String dataSize = 
FormatUtils.formatDataSize(bytesReceived);
+               logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] { 
+                               this, flowFileDescription, dataSize, peer, 
uploadMillis, uploadDataRate });
+    }
+
+    
+    @Override
+    public void transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
+               FlowFile flowFile = session.get();
+               if (flowFile == null) {
+                       return;
+               }
+
+               try {
+                       final String userDn = 
peer.getCommunicationsSession().getUserDn();
+                       final long startSendingNanos = System.nanoTime();
+                       final StopWatch stopWatch = new StopWatch(true);
+                       long bytesSent = 0L;
+                       
+                       final Transaction transaction = startTransaction(peer, 
codec, TransferDirection.SEND);
+                       
+                       final Set<FlowFile> flowFilesSent = new HashSet<>();
+               boolean continueTransaction = true;
+               while (continueTransaction) {
+                       final long startNanos = System.nanoTime();
+                   // call codec.encode within a session callback so that we 
have the InputStream to read the FlowFile
+                   final FlowFile toWrap = flowFile;
+                   session.read(flowFile, new InputStreamCallback() {
+                       @Override
+                       public void process(final InputStream in) throws 
IOException {
+                           final DataPacket dataPacket = new 
StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+                           transaction.send(dataPacket);
+                       }
+                   });
+                   
+                   final long transferNanos = System.nanoTime() - startNanos;
+                   final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+                   
+                   flowFilesSent.add(flowFile);
+                   bytesSent += flowFile.getSize();
+                   logger.debug("{} Sent {} to {}", this, flowFile, peer);
+                   
+                   final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
+                   session.getProvenanceReporter().send(flowFile, transitUri, 
"Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, 
false);
+                   session.remove(flowFile);
+                   
+                   final long sendingNanos = System.nanoTime() - 
startSendingNanos;
+                   if ( sendingNanos < BATCH_SEND_NANOS ) { 
+                       flowFile = session.get();
+                   } else {
+                       flowFile = null;
+                   }
+                   
+                   continueTransaction = (flowFile != null);
+               }
+               
+               transaction.confirm();
+               
+               // consume input stream entirely, ignoring its contents. If we
+               // don't do this, the Connection will not be returned to the 
pool
+               stopWatch.stop();
+               final String uploadDataRate = 
stopWatch.calculateDataRate(bytesSent);
+               final long uploadMillis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
+               final String dataSize = FormatUtils.formatDataSize(bytesSent);
+               
+               session.commit();
+               transaction.complete(false);
+               
+               final String flowFileDescription = (flowFilesSent.size() < 20) 
? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+               logger.info("{} Successfully sent {} ({}) to {} in {} 
milliseconds at a rate of {}", new Object[] {
+                   this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
+               } catch (final Exception e) {
+                       session.rollback();
+                       throw e;
+               }
+    }
+    
+    
+    @Override
+    public VersionNegotiator getVersionNegotiator() {
+        return versionNegotiator;
+    }
+    
+    @Override
+    public void shutdown(final Peer peer) throws IOException {
+        readyForFileTransfer = false;
+        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
+        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
+        
+        logger.debug("{} Shutting down with {}", this, peer);
+        // Indicate that we would like to have some data
+        RequestType.SHUTDOWN.writeRequestType(dos);
+        dos.flush();
+    }
+
+    @Override
+    public String getResourceName() {
+        return "SocketFlowFileProtocol";
+    }
+    
+    @Override
+    public String toString() {
+        return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
new file mode 100644
index 0000000..edb360e
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientTransaction implements Transaction {
+       private static final Logger logger = 
LoggerFactory.getLogger(SocketClientTransaction.class);
+       
+       
+       private final CRC32 crc = new CRC32();
+       private final int protocolVersion;
+       private final FlowFileCodec codec;
+       private final DataInputStream dis;
+       private final DataOutputStream dos;
+       private final TransferDirection direction;
+       private final boolean compress;
+       private final Peer peer;
+       private final int penaltyMillis;
+       
+       private boolean dataAvailable = false;
+       private int transfers = 0;
+       private TransactionState state;
+       
+       SocketClientTransaction(final int protocolVersion, final Peer peer, 
final FlowFileCodec codec, 
+                       final TransferDirection direction, final boolean 
useCompression, final int penaltyMillis) throws IOException {
+               this.protocolVersion = protocolVersion;
+               this.peer = peer;
+               this.codec = codec;
+               this.direction = direction;
+               this.dis = new 
DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
+               this.dos = new 
DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+               this.compress = useCompression;
+               this.state = TransactionState.TRANSACTION_STARTED;
+               this.penaltyMillis = penaltyMillis;
+               
+               initialize();
+       }
+       
+       private void initialize() throws IOException {
+           try {
+            if ( direction == TransferDirection.RECEIVE ) {
+                // Indicate that we would like to have some data
+                RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+                dos.flush();
+                
+                final Response dataAvailableCode = Response.read(dis);
+                switch (dataAvailableCode.getCode()) {
+                    case MORE_DATA:
+                        logger.debug("{} {} Indicates that data is available", 
this, peer);
+                        this.dataAvailable = true;
+                        break;
+                    case NO_MORE_DATA:
+                        logger.debug("{} No data available from {}", peer);
+                        this.dataAvailable = false;
+                        return;
+                    default:
+                        throw new ProtocolException("Got unexpected response 
when asking for data: " + dataAvailableCode);
+                }
+    
+            } else {
+                // Indicate that we would like to have some data
+                RequestType.SEND_FLOWFILES.writeRequestType(dos);
+                dos.flush();
+            }
+           } catch (final Exception e) {
+               error();
+               throw e;
+           }
+       }
+       
+       
+       @Override
+       public DataPacket receive() throws IOException {
+           try {
+               if ( state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
+                       throw new IllegalStateException("Cannot receive data 
because Transaction State is " + state);
+               }
+               
+               if ( direction == TransferDirection.SEND ) {
+                   throw new IllegalStateException("Attempting to receive data 
but started a SEND Transaction");
+               }
+               
+               // if we already know there's no data, just return null
+               if ( !dataAvailable ) {
+                   return null;
+               }
+    
+               // if we have already received a packet, check if another is 
available.
+               if ( transfers > 0 ) {
+                   // Determine if Peer will send us data or has no data to 
send us
+                final Response dataAvailableCode = Response.read(dis);
+                switch (dataAvailableCode.getCode()) {
+                    case CONTINUE_TRANSACTION:
+                        logger.debug("{} {} Indicates Transaction should 
continue", this, peer);
+                        this.dataAvailable = true;
+                        break;
+                    case FINISH_TRANSACTION:
+                        logger.debug("{} {} Indicates Transaction should 
finish", peer);
+                        this.dataAvailable = false;
+                        break;
+                    default:
+                        throw new ProtocolException("Got unexpected response 
when asking for data: " + dataAvailableCode);
+                }
+            }
+               
+               // if no data available, return null
+               if ( !dataAvailable ) {
+                   return null;
+               }
+               
+            logger.debug("{} Receiving data from {}", this, peer);
+            final DataPacket packet = codec.decode(new CheckedInputStream(dis, 
crc));
+            
+            if ( packet == null ) {
+                this.dataAvailable = false;
+            } else {
+               transfers++;
+            }
+            
+            this.state = TransactionState.DATA_EXCHANGED;
+            return packet;
+           } catch (final Exception e) {
+               error();
+               throw e;
+           }
+       }
+       
+       
+       @Override
+       public void send(DataPacket dataPacket) throws IOException {
+           try {
+               if ( state != TransactionState.DATA_EXCHANGED && state != 
TransactionState.TRANSACTION_STARTED) {
+                       throw new IllegalStateException("Cannot send data 
because Transaction State is " + state);
+               }
+    
+            if ( direction == TransferDirection.RECEIVE ) {
+                throw new IllegalStateException("Attempting to send data but 
started a RECEIVE Transaction");
+            }
+    
+               if ( transfers > 0 ) {
+                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+            }
+    
+            logger.debug("{} Sending data to {}", this, peer);
+    
+               final OutputStream out = new CheckedOutputStream(dos, crc);
+            codec.encode(dataPacket, out);
+            
+            // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
+            // Otherwise, do NOT close it because we don't want to close the 
underlying stream
+            // (CompressionOutputStream will not close the underlying stream 
when it's closed)
+            if ( compress ) {
+               out.close();
+            }
+            
+            transfers++;
+            this.state = TransactionState.DATA_EXCHANGED;
+           } catch (final Exception e) {
+               error();
+               throw e;
+           }
+       }
+       
+       
+       @Override
+       public void cancel(final String explanation) throws IOException {
+               if ( state == TransactionState.TRANSACTION_CANCELED || state == 
TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
+                       throw new IllegalStateException("Cannot cancel 
transaction because state is already " + state);
+               }
+
+               try {
+                   ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, 
explanation == null ? "<No explanation given>" : explanation);
+                   state = TransactionState.TRANSACTION_CANCELED;
+               } catch (final IOException ioe) {
+                   error();
+                   throw ioe;
+               }
+       }
+       
+       
+       @Override
+       public void complete(boolean requestBackoff) throws IOException {
+           try {
+               if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+                       throw new IllegalStateException("Cannot complete 
transaction because state is " + state + 
+                                       "; Transaction can only be completed 
when state is " + TransactionState.TRANSACTION_CONFIRMED);
+               }
+               
+               if ( direction == TransferDirection.RECEIVE ) {
+                   if ( transfers == 0 ) {
+                       state = TransactionState.TRANSACTION_COMPLETED;
+                       return;
+                   }
+                   
+                if ( requestBackoff ) {
+                    // Confirm that we received the data and the peer can now 
discard it but that the peer should not
+                    // send any more data for a bit
+                    logger.debug("{} Sending 
TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+                    
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+                } else {
+                    // Confirm that we received the data and the peer can now 
discard it
+                    logger.debug("{} Sending TRANSACTION_FINISHED to {}", 
this, peer);
+                    ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+                }
+                
+                state = TransactionState.TRANSACTION_COMPLETED;
+            } else {
+                final Response transactionResponse;
+                try {
+                    transactionResponse = Response.read(dis);
+                } catch (final IOException e) {
+                    throw new IOException(this + " Failed to receive a 
response from " + peer + " when expecting a TransactionFinished Indicator. " +
+                            "It is unknown whether or not the peer 
successfully received/processed the data.", e);
+                }
+                
+                logger.debug("{} Received {} from {}", this, 
transactionResponse, peer);
+                if ( transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+                    peer.penalize(penaltyMillis);
+                } else if ( transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED ) {
+                    throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
+                }
+                
+                state = TransactionState.TRANSACTION_COMPLETED;
+            }
+           } catch (final Exception e) {
+               error();
+               throw e;
+           }
+       }
+       
+       
+       @Override
+       public void confirm() throws IOException {
+           try {
+               if ( state == TransactionState.TRANSACTION_STARTED && 
!dataAvailable && direction == TransferDirection.RECEIVE ) {
+                   // client requested to receive data but no data available. 
no need to confirm.
+                   state = TransactionState.TRANSACTION_CONFIRMED;
+                   return;
+               }
+               
+               if ( state != TransactionState.DATA_EXCHANGED ) {
+                       throw new IllegalStateException("Cannot confirm 
Transaction because state is " + state + 
+                                       "; Transaction can only be confirmed 
when state is " + TransactionState.DATA_EXCHANGED );
+               }
+    
+            if ( direction == TransferDirection.RECEIVE ) {
+                if ( dataAvailable ) {
+                    throw new IllegalStateException("Cannot complete 
transaction because the sender has already sent more data than client has 
consumed.");
+                }
+                
+                // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
+                // to peer so that we can verify that the connection is still 
open. This is a two-phase commit,
+                // which helps to prevent the chances of data duplication. 
Without doing this, we may commit the
+                // session and then when we send the response back to the 
peer, the peer may have timed out and may not
+                // be listening. As a result, it will re-send the data. By 
doing this two-phase commit, we narrow the
+                // Critical Section involved in this transaction so that 
rather than the Critical Section being the
+                // time window involved in the entire transaction, it is 
reduced to a simple round-trip conversation.
+                logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to 
{}", this, peer);
+                final String calculatedCRC = String.valueOf(crc.getValue());
+                ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, 
calculatedCRC);
+                
+                final Response confirmTransactionResponse;
+                try {
+                    confirmTransactionResponse = Response.read(dis);
+                } catch (final IOException ioe) {
+                    logger.error("Failed to receive response code from {} when 
expected confirmation of transaction", peer);
+                    throw ioe;
+                }
+                
+                logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
+                
+                switch (confirmTransactionResponse.getCode()) {
+                    case CONFIRM_TRANSACTION:
+                        break;
+                    case BAD_CHECKSUM:
+                        throw new IOException(this + " Received a BadChecksum 
response from peer " + peer);
+                    default:
+                        throw new ProtocolException(this + " Received 
unexpected Response from peer " + peer + " : " + confirmTransactionResponse + 
"; expected 'Confirm Transaction' Response Code");
+                }
+                
+                state = TransactionState.TRANSACTION_CONFIRMED;
+            } else {
+                logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", 
this, peer);
+                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+                
+                final String calculatedCRC = String.valueOf(crc.getValue());
+                
+                // we've sent a FINISH_TRANSACTION. Now we'll wait for the 
peer to send a 'Confirm Transaction' response
+                final Response transactionConfirmationResponse = 
Response.read(dis);
+                if ( transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION ) {
+                    // Confirm checksum and echo back the confirmation.
+                    logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
+                    final String receivedCRC = 
transactionConfirmationResponse.getMessage();
+                    
+                    // CRC was not used before version 4
+                    if ( protocolVersion > 3 ) {
+                        if ( !receivedCRC.equals(calculatedCRC) ) {
+                            ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                            throw new IOException(this + " Sent data to peer " 
+ peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer 
calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and 
rolling back session");
+                        }
+                    }
+                    
+                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+                } else {
+                    throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
+                }
+                
+                state = TransactionState.TRANSACTION_CONFIRMED;
+            }
+           } catch (final Exception e) {
+               error();
+               throw e;
+           }
+       }
+
+       @Override
+       public void error() {
+           this.state = TransactionState.ERROR;
+       }
+       
+       @Override
+       public TransactionState getState() {
+               return state;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
new file mode 100644
index 0000000..6dab77b
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.util.Set;
+
+import org.apache.nifi.remote.PeerStatus;
+
+public class PeerStatusCache {
+       private final Set<PeerStatus> statuses;
+    private final long timestamp;
+
+    public PeerStatusCache(final Set<PeerStatus> statuses) {
+        this(statuses, System.currentTimeMillis());
+    }
+
+    public PeerStatusCache(final Set<PeerStatus> statuses, final long 
timestamp) {
+        this.statuses = statuses;
+        this.timestamp = timestamp;
+    }
+
+    public Set<PeerStatus> getStatuses() {
+        return statuses;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
new file mode 100644
index 0000000..b2dbdcd
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nifi.web.api.dto.ControllerDTO;
+import org.apache.nifi.web.api.entity.ControllerEntity;
+import org.apache.nifi.web.util.WebUtils;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+/**
+ *
+ */
+public class RemoteNiFiUtils {
+
+    public static final String CONTROLLER_URI_PATH = "/controller";
+
+    private static final int CONNECT_TIMEOUT = 10000;
+    private static final int READ_TIMEOUT = 10000;
+    
+    private final Client client;
+    
+    public RemoteNiFiUtils(final SSLContext sslContext) {
+        this.client = getClient(sslContext);
+    }
+    
+
+    /**
+     * Gets the content at the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis) throws 
ClientHandlerException, UniformInterfaceException {
+        return get(uri, timeoutMillis, null);
+    }
+    
+    /**
+     * Gets the content at the specified URI using the given query parameters.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @param queryParams 
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final int timeoutMillis, final 
Map<String, String> queryParams) throws ClientHandlerException, 
UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        if ( queryParams != null ) {
+            for ( final Map.Entry<String, String> queryEntry : 
queryParams.entrySet() ) {
+                webResource = webResource.queryParam(queryEntry.getKey(), 
queryEntry.getValue());
+            }
+        }
+
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, 
timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
timeoutMillis);
+
+        return 
webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    }
+
+    /**
+     * Performs a HEAD request to the specified URI.
+     *
+     * @param uri
+     * @param timeoutMillis
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse head(final URI uri, final int timeoutMillis) throws 
ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, 
timeoutMillis);
+        webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
timeoutMillis);
+        return webResource.head();
+    }
+
+    /**
+     * Gets a client based on the specified URI.
+     * 
+     * @param uri
+     * @return 
+     */
+    private Client getClient(final SSLContext sslContext) {
+        final Client client;
+        if (sslContext == null) {
+            client = WebUtils.createClient(null);
+        } else {
+            client = WebUtils.createClient(null, sslContext);
+        }
+
+        client.setReadTimeout(READ_TIMEOUT);
+        client.setConnectTimeout(CONNECT_TIMEOUT);
+
+        return client;
+    }
+    
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow 
File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the base URI of the remote instance. This should include the 
path only to the nifi-api level, as well as the protocol, host, and port.
+     * @param timeoutMillis
+     * @return
+     * @throws IOException
+     */
+    public Integer getRemoteListeningPort(final String uri, final int 
timeoutMillis) throws IOException {
+       try {
+                       final URI uriObject = new URI(uri + 
CONTROLLER_URI_PATH);
+                       return getRemoteListeningPort(uriObject, timeoutMillis);
+               } catch (URISyntaxException e) {
+                       throw new IOException("Unable to establish connection 
to remote host because URI is invalid: " + uri);
+               }
+    }
+    
+    public String getRemoteRootGroupId(final String uri, final int 
timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getRemoteRootGroupId(uriObject, timeoutMillis);
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote 
host because URI is invalid: " + uri);
+        }
+    }
+    
+    public String getRemoteInstanceId(final String uri, final int 
timeoutMillis) throws IOException {
+        try {
+            final URI uriObject = new URI(uri + CONTROLLER_URI_PATH);
+            return getController(uriObject, timeoutMillis).getInstanceId();
+        } catch (URISyntaxException e) {
+            throw new IOException("Unable to establish connection to remote 
host because URI is invalid: " + uri);
+        }
+    }
+    
+    /**
+     * Returns the port on which the remote instance is listening for Flow 
File transfers, or <code>null</code> if the remote instance
+     * is not configured to use Site-to-Site transfers.
+     * 
+     * @param uri the full URI to fetch, including the path.
+     * @return
+     * @throws IOException
+     */
+    private Integer getRemoteListeningPort(final URI uri, final int 
timeoutMillis) throws IOException {
+       return getController(uri, timeoutMillis).getRemoteSiteListeningPort();
+    }
+    
+    private String getRemoteRootGroupId(final URI uri, final int 
timeoutMillis) throws IOException {
+        return getController(uri, timeoutMillis).getId();
+    }
+    
+    public ControllerDTO getController(final URI uri, final int timeoutMillis) 
throws IOException {
+        final ClientResponse response = get(uri, timeoutMillis);
+        
+        if (Status.OK.getStatusCode() == 
response.getStatusInfo().getStatusCode()) {
+            final ControllerEntity entity = 
response.getEntity(ControllerEntity.class);
+            return entity.getController();
+        } else {
+            final String responseMessage = response.getEntity(String.class);
+            throw new IOException("Got HTTP response Code " + 
response.getStatusInfo().getStatusCode() + ": " + 
response.getStatusInfo().getReasonPhrase() + " with explanation: " + 
responseMessage);
+        }
+    }
+    
+    /**
+     * Issues a registration request on behalf of the current user.
+     * 
+     * @param baseApiUri 
+     * @return  
+     */
+    public ClientResponse issueRegistrationRequest(String baseApiUri) {
+        final URI uri = URI.create(String.format("%s/%s", baseApiUri, 
"/controller/users"));
+
+        // set up the query params
+        MultivaluedMapImpl entity = new MultivaluedMapImpl();
+        entity.add("justification", "A Remote instance of NiFi has attempted 
to create a reference to this NiFi. This action must be approved first.");
+        
+        // create the web resource
+        WebResource webResource = client.resource(uri);
+        
+        // get the client utils and make the request
+        return 
webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..bd1b50c
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+       private final Map<String, String> attributes;
+       private final InputStream stream;
+       private final long size;
+       
+       public StandardDataPacket(final Map<String, String> attributes, final 
InputStream stream, final long size) {
+               this.attributes = attributes;
+               this.stream = new MinimumLengthInputStream(new 
LimitingInputStream(stream, size), size);
+               this.size = size;
+       }
+       
+       public Map<String, String> getAttributes() {
+               return attributes;
+       }
+       
+       public InputStream getData() {
+               return stream;
+       }
+       
+       public long getSize() {
+               return size;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
new file mode 100644
index 0000000..d8899ea
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.junit.Test;
+
+public class TestEndpointConnectionStatePool {
+
+    @Test
+    public void testFormulateDestinationListForOutput() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
10240));
+        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 
1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
+        }
+    }
+    
+    @Test
+    public void testFormulateDestinationListForOutputHugeDifference() throws 
IOException {
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 
500));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
+        }
+    }
+    
+    
+    
+    
+    @Test
+    public void testFormulateDestinationListForInputPorts() throws IOException 
{
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
10240));
+        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 
1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
+        }
+    }
+    
+    @Test
+    public void testFormulateDestinationListForInputPortsHugeDifference() 
throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 
500));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        for ( final PeerStatus peerStatus : destinations ) {
+            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..421d579
--- /dev/null
+++ 
b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+    private final InputStream in;
+    private final long limit;
+    private long bytesRead = 0;
+
+    public LimitingInputStream(final InputStream in, final long limit) {
+        this.in = in;
+        this.limit = limit;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int val = in.read();
+        if (val > -1) {
+            bytesRead++;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+        final int val = in.read(b, 0, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (bytesRead >= limit) {
+            return -1;
+        }
+
+        final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+        final int val = in.read(b, off, maxToRead);
+        if (val > 0) {
+            bytesRead += val;
+        }
+        return val;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        bytesRead += skipped;
+        return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+    }
+
+    public long getLimit() {
+       return limit;
+    }
+}

Reply via email to