Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 42f69196c -> 705ee852b


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
deleted file mode 100644
index d18a4ee..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.codec;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public class StandardFlowFileCodec implements FlowFileCodec {
-       public static final int MAX_NUM_ATTRIBUTES = 25000;
-
-    public static final String DEFAULT_FLOWFILE_PATH = "./";
-
-    private final VersionNegotiator versionNegotiator;
-
-    public StandardFlowFileCodec() {
-        versionNegotiator = new StandardVersionNegotiator(1);
-    }
-    
-    @Override
-    public FlowFile encode(final FlowFile flowFile, final ProcessSession 
session, final OutputStream encodedOut) throws IOException {
-        final DataOutputStream out = new DataOutputStream(encodedOut);
-        
-        final Map<String, String> attributes = flowFile.getAttributes();
-        out.writeInt(attributes.size());
-        for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
-            writeString(entry.getKey(), out);
-            writeString(entry.getValue(), out);
-        }
-        
-        out.writeLong(flowFile.getSize());
-        
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                final byte[] buffer = new byte[8192];
-                int len;
-                while ( (len = in.read(buffer)) > 0 ) {
-                    encodedOut.write(buffer, 0, len);
-                }
-                
-                encodedOut.flush();
-            }
-        });
-        
-        return flowFile;
-    }
-
-    
-    @Override
-    public FlowFile decode(final InputStream stream, final ProcessSession 
session) throws IOException, ProtocolException {
-        final DataInputStream in = new DataInputStream(stream);
-        
-        final int numAttributes;
-        try {
-            numAttributes = in.readInt();
-        } catch (final EOFException e) {
-            // we're out of data.
-            return null;
-        }
-        
-        // This is here because if the stream is not properly formed, we could 
get up to Integer.MAX_VALUE attributes, which will
-        // generally result in an OutOfMemoryError.
-        if ( numAttributes > MAX_NUM_ATTRIBUTES ) {
-               throw new ProtocolException("FlowFile exceeds maximum number of 
attributes with a total of " + numAttributes);
-        }
-        
-        try {
-            final Map<String, String> attributes = new 
HashMap<>(numAttributes);
-            for (int i=0; i < numAttributes; i++) {
-                final String attrName = readString(in);
-                final String attrValue = readString(in);
-                attributes.put(attrName, attrValue);
-            }
-            
-            final long numBytes = in.readLong();
-            
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    int len;
-                    long size = 0;
-                    final byte[] buffer = new byte[8192];
-                    
-                    while ( size < numBytes && (len = in.read(buffer, 0, (int) 
Math.min(buffer.length, numBytes - size))) > 0 ) {
-                        out.write(buffer, 0, len);
-                        size += len;
-                    }
-
-                    if ( size != numBytes ) {
-                        throw new EOFException("Expected " + numBytes + " 
bytes but received only " + size);
-                    }
-                }
-            });
-
-            return flowFile;
-        } catch (final EOFException e) {
-               session.rollback();
-               
-            // we throw the general IOException here because we did not expect 
to hit EOFException
-            throw e;
-        }
-    }
-
-    private void writeString(final String val, final DataOutputStream out) 
throws IOException {
-        final byte[] bytes = val.getBytes("UTF-8");
-        out.writeInt(bytes.length);
-        out.write(bytes);
-    }
-
-    
-    private String readString(final DataInputStream in) throws IOException {
-        final int numBytes = in.readInt();
-        final byte[] bytes = new byte[numBytes];
-        StreamUtils.fillBuffer(in, bytes, true);
-        return new String(bytes, "UTF-8");
-    }
-    
-    @Override
-    public List<Integer> getSupportedVersions() {
-        return versionNegotiator.getSupportedVersions();
-    }
-
-    @Override
-    public VersionNegotiator getVersionNegotiator() {
-        return versionNegotiator;
-    }
-
-    @Override
-    public String toString() {
-        return "Standard FlowFile Codec, Version " + 
versionNegotiator.getVersion();
-    }
-
-    @Override
-    public String getResourceName() {
-        return "StandardFlowFileCodec";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
deleted file mode 100644
index 0822b6a..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-
-import org.apache.nifi.remote.AbstractCommunicationsSession;
-
-public class SocketChannelCommunicationsSession extends 
AbstractCommunicationsSession {
-    private final SocketChannel channel;
-    private final SocketChannelInput request;
-    private final SocketChannelOutput response;
-    private int timeout = 30000;
-    
-    public SocketChannelCommunicationsSession(final SocketChannel 
socketChannel, final String uri) throws IOException {
-        super(uri);
-        request = new SocketChannelInput(socketChannel);
-        response = new SocketChannelOutput(socketChannel);
-        channel = socketChannel;
-        socketChannel.configureBlocking(false);
-    }
-    
-    @Override
-    public boolean isClosed() {
-        return !channel.isConnected();
-    }
-    
-    @Override
-    public SocketChannelInput getInput() {
-        return request;
-    }
-
-    @Override
-    public SocketChannelOutput getOutput() {
-        return response;
-    }
-
-    @Override
-    public void setTimeout(final int millis) throws IOException {
-        request.setTimeout(millis);
-        response.setTimeout(millis);
-        this.timeout = millis;
-    }
-
-    @Override
-    public int getTimeout() throws IOException {
-        return timeout;
-    }
-
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-    
-    @Override
-    public boolean isDataAvailable() {
-        return request.isDataAvailable();
-    }
-
-    @Override
-    public long getBytesWritten() {
-        return response.getBytesWritten();
-    }
-
-    @Override
-    public long getBytesRead() {
-        return request.getBytesRead();
-    }
-    
-    @Override
-    public void interrupt() {
-        request.interrupt();
-        response.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
deleted file mode 100644
index 9e451fd..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.SocketChannel;
-
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.remote.io.InterruptableInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-
-public class SocketChannelInput implements CommunicationsInput {
-    private final SocketChannelInputStream socketIn;
-    private final ByteCountingInputStream countingIn;
-    private final InputStream bufferedIn;
-    private final InterruptableInputStream interruptableIn;
-    
-    public SocketChannelInput(final SocketChannel socketChannel) throws 
IOException {
-        this.socketIn = new SocketChannelInputStream(socketChannel);
-        countingIn = new ByteCountingInputStream(socketIn);
-        bufferedIn = new BufferedInputStream(countingIn);
-        interruptableIn = new InterruptableInputStream(bufferedIn);
-    }
-    
-    @Override
-    public InputStream getInputStream() throws IOException {
-        return interruptableIn;
-    }
-
-    public void setTimeout(final int millis) {
-        socketIn.setTimeout(millis);
-    }
-    
-    public boolean isDataAvailable() {
-        try {
-            return interruptableIn.available() > 0;
-        } catch (final Exception e) {
-            return false;
-        }
-    }
-    
-    @Override
-    public long getBytesRead() {
-        return countingIn.getBytesRead();
-    }
-    
-    public void interrupt() {
-        interruptableIn.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
deleted file mode 100644
index 26c0164..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.channels.SocketChannel;
-
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.remote.io.InterruptableOutputStream;
-import org.apache.nifi.remote.protocol.CommunicationsOutput;
-
-public class SocketChannelOutput implements CommunicationsOutput {
-    private final SocketChannelOutputStream socketOutStream;
-    private final ByteCountingOutputStream countingOut;
-    private final OutputStream bufferedOut;
-    private final InterruptableOutputStream interruptableOut;
-    
-    public SocketChannelOutput(final SocketChannel socketChannel) throws 
IOException {
-        socketOutStream = new SocketChannelOutputStream(socketChannel);
-        countingOut = new ByteCountingOutputStream(socketOutStream);
-        bufferedOut = new BufferedOutputStream(countingOut);
-        interruptableOut = new InterruptableOutputStream(bufferedOut);
-    }
-    
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-        return interruptableOut;
-    }
-    
-    public void setTimeout(final int timeout) {
-        socketOutStream.setTimeout(timeout);
-    }
-    
-    @Override
-    public long getBytesWritten() {
-        return countingOut.getBytesWritten();
-    }
-    
-    public void interrupt() {
-        interruptableOut.interrupt();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
index a526f4c..391d52b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
@@ -21,9 +21,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collection;
 
-import org.apache.nifi.cluster.ClusterNodeInformation;
-import org.apache.nifi.cluster.NodeInformant;
-import org.apache.nifi.cluster.NodeInformation;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -31,12 +28,14 @@ import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformant;
+import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
deleted file mode 100644
index c4519cd..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-public enum HandshakeProperty {
-    GZIP,
-    PORT_IDENTIFIER,
-    REQUEST_EXPIRATION_MILLIS;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
deleted file mode 100644
index eae1940..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.nifi.remote.exception.ProtocolException;
-
-public class Response {
-    private final ResponseCode code;
-    private final String message;
-    
-    private Response(final ResponseCode code, final String explanation) {
-        this.code = code;
-        this.message = explanation;
-    }
-    
-    public ResponseCode getCode() {
-        return code;
-    }
-    
-    public String getMessage() {
-        return message;
-    }
-    
-    public static Response read(final DataInputStream in) throws IOException, 
ProtocolException {
-        final ResponseCode code = ResponseCode.readCode(in);
-        final String message = code.containsMessage() ? in.readUTF() : null;
-        return new Response(code, message);
-    }
-    
-    @Override
-    public String toString() {
-        return code + ": " + message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
deleted file mode 100644
index 0e588cd..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.remote.exception.ProtocolException;
-
-
-public enum ResponseCode {
-    RESERVED(0, "Reserved for Future Use", false), // This will likely be used 
if we ever need to expand the length of
-                                            // ResponseCode, so that we can 
indicate a 0 followed by some other bytes
-    
-    // handshaking properties
-    PROPERTIES_OK(1, "Properties OK", false),
-    UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
-    ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
-    MISSING_PROPERTY(232, "Missing Property", true),
-    
-    // transaction indicators
-    CONTINUE_TRANSACTION(10, "Continue Transaction", false),
-    FINISH_TRANSACTION(11, "Finish Transaction", false),
-    CONFIRM_TRANSACTION(12, "Confirm Transaction", true),   // "Explanation" 
of this code is the checksum
-    TRANSACTION_FINISHED(13, "Transaction Finished", false),
-    TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But 
Destination is Full", false),
-    BAD_CHECKSUM(19, "Bad Checksum", false),
-
-    // data availability indicators
-    MORE_DATA(20, "More Data Exists", false),
-    NO_MORE_DATA(21, "No More Data Exists", false),
-    
-    // port state indicators
-    UNKNOWN_PORT(200, "Unknown Port", false),
-    PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
-    PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
-    
-    // authorization
-    UNAUTHORIZED(240, "User Not Authorized", true),
-    
-    // error indicators
-    ABORT(250, "Abort", true),
-    UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
-    END_OF_STREAM(255, "End of Stream", false);
-    
-    private static final ResponseCode[] codeArray = new ResponseCode[256];
-    
-    static {
-        for ( final ResponseCode responseCode : ResponseCode.values() ) {
-            codeArray[responseCode.getCode()] = responseCode;
-        }
-    }
-    
-    private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
-    private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
-    private final int code;
-    private final byte[] codeSequence;
-    private final String description;
-    private final boolean containsMessage;
-    
-    private ResponseCode(final int code, final String description, final 
boolean containsMessage) {
-        this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, (byte) code};
-        this.code = code;
-        this.description = description;
-        this.containsMessage = containsMessage;
-    }
-    
-    public int getCode() {
-        return code;
-    }
-    
-    public byte[] getCodeSequence() {
-        return codeSequence;
-    }
-    
-    @Override
-    public String toString() {
-        return description;
-    }
-    
-    public boolean containsMessage() {
-        return containsMessage;
-    }
-    
-    public void writeResponse(final DataOutputStream out) throws IOException {
-        if ( containsMessage() ) {
-            throw new IllegalArgumentException("ResponseCode " + code + " 
expects an explanation");
-        }
-        
-        out.write(getCodeSequence());
-        out.flush();
-    }
-    
-    public void writeResponse(final DataOutputStream out, final String 
explanation) throws IOException {
-        if ( !containsMessage() ) {
-            throw new IllegalArgumentException("ResponseCode " + code + " does 
not expect an explanation");
-        }
-        
-        out.write(getCodeSequence());
-        out.writeUTF(explanation);
-        out.flush();
-    }
-    
-    static ResponseCode readCode(final InputStream in) throws IOException, 
ProtocolException {
-        final int byte1 = in.read();
-        if ( byte1 < 0 ) {
-            throw new EOFException();
-        } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
-            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
-        }
-        
-        final int byte2 = in.read();
-        if ( byte2 < 0 ) {
-            throw new EOFException();
-        } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
-            throw new ProtocolException("Expected to receive ResponseCode, but 
the stream did not have a ResponseCode");
-        }
-
-        final int byte3 = in.read();
-        if ( byte3 < 0 ) {
-            throw new EOFException();
-        }
-        
-        final ResponseCode responseCode = codeArray[byte3];
-        if (responseCode == null) {
-            throw new ProtocolException("Received Response Code of " + byte3 + 
" but do not recognize this code");
-        }
-        return responseCode;
-    }
-    
-    public static ResponseCode fromSequence(final byte[] value) {
-        final int code = value[3] & 0xFF;
-        final ResponseCode responseCode = codeArray[code];
-        return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : 
responseCode;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
deleted file mode 100644
index d4b4f61..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol.socket;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.RemoteResourceFactory;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.codec.StandardFlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
-import org.apache.nifi.remote.protocol.ClientProtocol;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.RequestType;
-import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.util.StopWatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SocketClientProtocol implements ClientProtocol {
-    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(4, 3, 2, 1);
-
-    
-    private RemoteGroupPort port;
-    private boolean useCompression;
-    
-    private String commsIdentifier;
-    private boolean handshakeComplete = false;
-    
-    private final Logger logger = 
LoggerFactory.getLogger(SocketClientProtocol.class);
-    
-    private Response handshakeResponse = null;
-    private boolean readyForFileTransfer = false;
-    private String transitUriPrefix = null;
-    
-    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); 
// send batches of up to 5 seconds
-    
-    public SocketClientProtocol() {
-    }
-
-    public void setPort(final RemoteGroupPort port) {
-        this.port = port;
-        this.useCompression = port.isUseCompression();
-    }
-    
-    @Override
-    public void handshake(final Peer peer) throws IOException, 
HandshakeException {
-        if ( handshakeComplete ) {
-            throw new IllegalStateException("Handshake has already been 
completed");
-        }
-        commsIdentifier = UUID.randomUUID().toString();
-        logger.debug("{} handshaking with {}", this, peer);
-        
-        final Map<HandshakeProperty, String> properties = new HashMap<>();
-        properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
-        properties.put(HandshakeProperty.PORT_IDENTIFIER, 
port.getIdentifier());
-        properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, 
String.valueOf(
-            
port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)) );
-        
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        
commsSession.setTimeout(port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS));
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        
-        dos.writeUTF(commsIdentifier);
-        
-        if ( versionNegotiator.getVersion() >= 3 ) {
-            dos.writeUTF(peer.getUrl());
-            transitUriPrefix = peer.getUrl();
-            
-            if ( !transitUriPrefix.endsWith("/") ) {
-                transitUriPrefix = transitUriPrefix + "/";
-            }
-        }
-        
-        dos.writeInt(properties.size());
-        for ( final Map.Entry<HandshakeProperty, String> entry : 
properties.entrySet() ) {
-            dos.writeUTF(entry.getKey().name());
-            dos.writeUTF(entry.getValue());
-        }
-        
-        dos.flush();
-        
-        try {
-            handshakeResponse = Response.read(dis);
-        } catch (final ProtocolException e) {
-            throw new HandshakeException(e);
-        }
-        
-        switch (handshakeResponse.getCode()) {
-            case PORT_NOT_IN_VALID_STATE:
-            case UNKNOWN_PORT:
-            case PORTS_DESTINATION_FULL:
-                break;
-            case PROPERTIES_OK:
-                readyForFileTransfer = true;
-                break;
-            default:
-                logger.error("{} received unexpected response {} from {} when 
negotiating Codec", new Object[] {
-                    this, handshakeResponse, peer});
-                peer.close();
-                throw new HandshakeException("Received unexpected response " + 
handshakeResponse);
-        }
-        
-        logger.debug("{} Finished handshake with {}", this, peer);
-        handshakeComplete = true;
-    }
-    
-    public boolean isReadyForFileTransfer() {
-        return readyForFileTransfer;
-    }
-    
-    public boolean isPortInvalid() {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not completed 
successfully");
-        }
-        return handshakeResponse.getCode() == 
ResponseCode.PORT_NOT_IN_VALID_STATE;
-    }
-    
-    public boolean isPortUnknown() {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not completed 
successfully");
-        }
-        return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
-    }
-    
-    public boolean isDestinationFull() {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not completed 
successfully");
-        }
-        return handshakeResponse.getCode() == 
ResponseCode.PORTS_DESTINATION_FULL;
-    }
-    
-    @Override
-    public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException 
{
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been 
performed");
-        }
-        
-        logger.debug("{} Get Peer Statuses from {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        
-        RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
-        dos.flush();
-        final int numPeers = dis.readInt();
-        final Set<PeerStatus> peers = new HashSet<>(numPeers);
-        for (int i=0; i < numPeers; i++) {
-            final String hostname = dis.readUTF();
-            final int port = dis.readInt();
-            final boolean secure = dis.readBoolean();
-            final int flowFileCount = dis.readInt();
-            peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
-        }
-        
-        logger.debug("{} Received {} Peer Statuses from {}", this, 
peers.size(), peer);
-        return peers;
-    }
-    
-    @Override
-    public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, 
ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been 
performed");
-        }
-
-        logger.debug("{} Negotiating Codec with {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-
-        RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
-        
-        FlowFileCodec codec = new StandardFlowFileCodec();
-        try {
-            codec = (FlowFileCodec) 
RemoteResourceFactory.initiateResourceNegotiation(codec, dis, dos);
-        } catch (HandshakeException e) {
-            throw new ProtocolException(e.toString());
-        }
-        logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] 
{this, codec, commsSession});
-
-        return codec;
-    }
-
-    
-    @Override
-    public void receiveFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been 
performed");
-        }
-        if ( !readyForFileTransfer ) {
-            throw new IllegalStateException("Cannot receive files; handshake 
resolution was " + handshakeResponse);
-        }
-
-        logger.debug("{} Receiving FlowFiles from {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
-        }
-        
-        // Indicate that we would like to have some data
-        RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
-        dos.flush();
-        
-        // Determine if Peer will send us data or has no data to send us
-        final Response dataAvailableCode = Response.read(dis);
-        switch (dataAvailableCode.getCode()) {
-            case MORE_DATA:
-                logger.debug("{} {} Indicates that data is available", this, 
peer);
-                break;
-            case NO_MORE_DATA:
-                logger.debug("{} No data available from {}", peer);
-                return;
-            default:
-                throw new ProtocolException("Got unexpected response when 
asking for data: " + dataAvailableCode);
-        }
-
-        final StopWatch stopWatch = new StopWatch(true);
-        final Set<FlowFile> flowFilesReceived = new HashSet<>();
-        long bytesReceived = 0L;
-        final CRC32 crc = new CRC32();
-        
-        // Peer has data. Decode the bytes into FlowFiles until peer says he's 
finished sending data.
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        while (continueTransaction) {
-            final InputStream flowFileInputStream = useCompression ? new 
CompressionInputStream(dis) : dis;
-            final CheckedInputStream checkedIn = new 
CheckedInputStream(flowFileInputStream, crc);
-            
-            final long startNanos = System.nanoTime();
-            FlowFile flowFile = codec.decode(checkedIn, session);
-            final long transmissionNanos = System.nanoTime() - startNanos;
-            final long transmissionMillis = 
TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
-            
-            final String sourceFlowFileIdentifier = 
flowFile.getAttribute(CoreAttributes.UUID.key());
-            flowFile = session.putAttribute(flowFile, 
CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-            
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
-            session.getProvenanceReporter().receive(flowFile, transitUri, 
"urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", 
Remote DN=" + userDn, transmissionMillis);
-            
-            session.transfer(flowFile, Relationship.ANONYMOUS);
-            bytesReceived += flowFile.getSize();
-            flowFilesReceived.add(flowFile);
-            logger.debug("{} Received {} from {}", this, flowFile, peer);
-            
-            final Response transactionCode = Response.read(dis);
-            switch (transactionCode.getCode()) {
-                case CONTINUE_TRANSACTION:
-                    logger.trace("{} Received ContinueTransaction indicator 
from {}", this, peer);
-                    break;
-                case FINISH_TRANSACTION:
-                    logger.trace("{} Received FinishTransaction indicator from 
{}", this, peer);
-                    continueTransaction = false;
-                    calculatedCRC = 
String.valueOf(checkedIn.getChecksum().getValue());
-                    break;
-                default:
-                    throw new ProtocolException("Received unexpected response 
from peer: when expecting Continue Transaction or Finish Transaction, received" 
+ transactionCode);
-            }
-        }
-        
-        // we received a FINISH_TRANSACTION indicator. Send back a 
CONFIRM_TRANSACTION message
-        // to peer so that we can verify that the connection is still open. 
This is a two-phase commit,
-        // which helps to prevent the chances of data duplication. Without 
doing this, we may commit the
-        // session and then when we send the response back to the peer, the 
peer may have timed out and may not
-        // be listening. As a result, it will re-send the data. By doing this 
two-phase commit, we narrow the
-        // Critical Section involved in this transaction so that rather than 
the Critical Section being the
-        // time window involved in the entire transaction, it is reduced to a 
simple round-trip conversation.
-        logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", 
this, peer);
-        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-        
-        final Response confirmTransactionResponse = Response.read(dis);
-        logger.trace("{} Received {} from {}", this, 
confirmTransactionResponse, peer);
-        
-        switch (confirmTransactionResponse.getCode()) {
-            case CONFIRM_TRANSACTION:
-                break;
-            case BAD_CHECKSUM:
-                session.rollback();
-                throw new IOException(this + " Received a BadChecksum response 
from peer " + peer);
-            default:
-                throw new ProtocolException(this + " Received unexpected 
Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 
'Confirm Transaction' Response Code");
-        }
-        
-        // Commit the session so that we have persisted the data
-        session.commit();
-        
-        if ( context.getAvailableRelationships().isEmpty() ) {
-            // Confirm that we received the data and the peer can now discard 
it but that the peer should not
-            // send any more data for a bit
-            logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL 
to {}", this, peer);
-            
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
-        } else {
-            // Confirm that we received the data and the peer can now discard 
it
-            logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
-            ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
-        }
-        
-        stopWatch.stop();
-        final String flowFileDescription = flowFilesReceived.size() < 20 ? 
flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
-        final String uploadDataRate = 
stopWatch.calculateDataRate(bytesReceived);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-        logger.info("{} Successfully receveied {} ({}) from {} in {} 
milliseconds at a rate of {}", new Object[] {
-            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-    }
-
-    @Override
-    public void transferFlowFiles(final Peer peer, final ProcessContext 
context, final ProcessSession session, final FlowFileCodec codec) throws 
IOException, ProtocolException {
-        if ( !handshakeComplete ) {
-            throw new IllegalStateException("Handshake has not been 
performed");
-        }
-        if ( !readyForFileTransfer ) {
-            throw new IllegalStateException("Cannot transfer files; handshake 
resolution was " + handshakeResponse);
-        }
-
-        FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
-            return;
-        }
-
-        logger.debug("{} Sending FlowFiles to {}", this, peer);
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        String userDn = commsSession.getUserDn();
-        if ( userDn == null ) {
-            userDn = "none";
-        }
-        
-        // Indicate that we would like to have some data
-        RequestType.SEND_FLOWFILES.writeRequestType(dos);
-        dos.flush();
-        
-        final StopWatch stopWatch = new StopWatch(true);
-        final CRC32 crc = new CRC32();
-        
-        long bytesSent = 0L;
-        final Set<FlowFile> flowFilesSent = new HashSet<>();
-        boolean continueTransaction = true;
-        String calculatedCRC = "";
-        final long startSendingNanos = System.nanoTime();
-        while (continueTransaction) {
-            final OutputStream flowFileOutputStream = useCompression ? new 
CompressionOutputStream(dos) : dos;
-            logger.debug("{} Sending {} to {}", this, flowFile, peer);
-            
-            final CheckedOutputStream checkedOutStream = new 
CheckedOutputStream(flowFileOutputStream, crc);
-            
-            final long startNanos = System.nanoTime();
-            flowFile = codec.encode(flowFile, session, checkedOutStream);
-            final long transferNanos = System.nanoTime() - startNanos;
-            final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-            
-            // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
-            // Otherwise, do NOT close it because we don't want to close the 
underlying stream
-            // (CompressionOutputStream will not close the underlying stream 
when it's closed)
-            if ( useCompression ) {
-                checkedOutStream.close();
-            }
-            
-            flowFilesSent.add(flowFile);
-            bytesSent += flowFile.getSize();
-            logger.debug("{} Sent {} to {}", this, flowFile, peer);
-            
-            final String transitUri = (transitUriPrefix == null) ? 
peer.getUrl() : transitUriPrefix + 
flowFile.getAttribute(CoreAttributes.UUID.key());
-            session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
-            session.remove(flowFile);
-            
-            final long sendingNanos = System.nanoTime() - startSendingNanos;
-            if ( sendingNanos < BATCH_SEND_NANOS ) { 
-                flowFile = session.get();
-            } else {
-                flowFile = null;
-            }
-            
-            continueTransaction = (flowFile != null);
-            if ( continueTransaction ) {
-                logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", 
this, peer);
-                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
-            } else {
-                logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", 
this, peer);
-                ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-                
-                calculatedCRC = String.valueOf( 
checkedOutStream.getChecksum().getValue() );
-            }
-        }
-        
-        // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to 
send a 'Confirm Transaction' response
-        final Response transactionConfirmationResponse = Response.read(dis);
-        if ( transactionConfirmationResponse.getCode() == 
ResponseCode.CONFIRM_TRANSACTION ) {
-            // Confirm checksum and echo back the confirmation.
-            logger.trace("{} Received {} from {}", this, 
transactionConfirmationResponse, peer);
-            final String receivedCRC = 
transactionConfirmationResponse.getMessage();
-            
-            if ( versionNegotiator.getVersion() > 3 ) {
-                if ( !receivedCRC.equals(calculatedCRC) ) {
-                    ResponseCode.BAD_CHECKSUM.writeResponse(dos);
-                    session.rollback();
-                    throw new IOException(this + " Sent data to peer " + peer 
+ " but calculated CRC32 Checksum as " + calculatedCRC + " while peer 
calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and 
rolling back session");
-                }
-            }
-            
-            ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
-        } else {
-            throw new ProtocolException("Expected to receive 'Confirm 
Transaction' response from peer " + peer + " but received " + 
transactionConfirmationResponse);
-        }
-
-        final String flowFileDescription = (flowFilesSent.size() < 20) ? 
flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-
-        final Response transactionResponse;
-        try {
-            transactionResponse = Response.read(dis);
-        } catch (final IOException e) {
-            logger.error("{} Failed to receive a response from {} when 
expecting a TransactionFinished Indicator." +
-                    " It is unknown whether or not the peer successfully 
received/processed the data." +
-                    " Therefore, {} will be rolled back, possibly resulting in 
data duplication of {}", 
-                    this, peer, session, flowFileDescription);
-            session.rollback();
-            throw e;
-        }
-        
-        logger.debug("{} Received {} from {}", this, transactionResponse, 
peer);
-        if ( transactionResponse.getCode() == 
ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
-            peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
-        } else if ( transactionResponse.getCode() != 
ResponseCode.TRANSACTION_FINISHED ) {
-            throw new ProtocolException("After sending data, expected 
TRANSACTION_FINISHED response but got " + transactionResponse);
-        }
-        
-        // consume input stream entirely, ignoring its contents. If we
-        // don't do this, the Connection will not be returned to the pool
-        stopWatch.stop();
-        final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
-        final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
-        final String dataSize = FormatUtils.formatDataSize(bytesSent);
-        
-        session.commit();
-        
-        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at 
a rate of {}", new Object[] {
-            this, flowFileDescription, dataSize, peer, uploadMillis, 
uploadDataRate});
-    }
-
-    @Override
-    public VersionNegotiator getVersionNegotiator() {
-        return versionNegotiator;
-    }
-    
-    @Override
-    public void shutdown(final Peer peer) throws IOException {
-        readyForFileTransfer = false;
-        final CommunicationsSession commsSession = 
peer.getCommunicationsSession();
-        final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
-        
-        logger.debug("{} Shutting down with {}", this, peer);
-        // Indicate that we would like to have some data
-        RequestType.SHUTDOWN.writeRequestType(dos);
-        dos.flush();
-    }
-
-    @Override
-    public String getResourceName() {
-        return "SocketFlowFileProtocol";
-    }
-    
-    @Override
-    public String toString() {
-        return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 5edd4f9..eb22b0e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -32,7 +32,6 @@ import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
 
-import org.apache.nifi.cluster.NodeInformant;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.flowfile.FlowFile;
@@ -41,24 +40,27 @@ import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.remote.Peer;
 import org.apache.nifi.remote.PortAuthorizationResult;
 import org.apache.nifi.remote.RemoteResourceFactory;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.cluster.NodeInformant;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.ProtocolException;
 import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
 import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,10 +78,14 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
     private FlowFileCodec negotiatedFlowFileCodec = null;
     private String transitUriPrefix = null;
     
-    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(4, 3, 2, 1);
+    private int requestedBatchCount = 0;
+    private long requestedBatchBytes = 0L;
+    private long requestedBatchNanos = 0L;
+    private static final long DEFAULT_BATCH_NANOS = 
TimeUnit.SECONDS.toNanos(5L);
+    
+    private final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(5, 4, 3, 2, 1);
     private final Logger logger = 
LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
     
-    private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // 
send batches of up to 5 seconds
 
     
     @Override
@@ -135,68 +141,90 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
                 throw new HandshakeException("Received unknown property: " + 
propertyName);
             }
             
-            switch (property) {
-                case GZIP: {
-                    useGzip = Boolean.parseBoolean(value);
-                    break;
-                }
-                case REQUEST_EXPIRATION_MILLIS:
-                    requestExpirationMillis = Long.parseLong(value);
-                    break;
-                case PORT_IDENTIFIER: {
-                    Port receivedPort = rootGroup.getInputPort(value);
-                    if ( receivedPort == null ) {
-                        receivedPort = rootGroup.getOutputPort(value);
-                    }
-                    if ( receivedPort == null ) {
-                        logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
-                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                        throw new HandshakeException("Received unknown port 
identifier: " + value);
-                    }
-                    if ( !(receivedPort instanceof RootGroupPort) ) {
-                        logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
-                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                        throw new HandshakeException("Received port identifier 
" + value + ", but this Port is not a RootGroupPort");
-                    }
-                    
-                    this.port = (RootGroupPort) receivedPort;
-                    final PortAuthorizationResult portAuthResult = 
this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
-                    if ( !portAuthResult.isAuthorized() ) {
-                        logger.debug("Responding with ResponseCode 
UNAUTHORIZED: ", portAuthResult.getExplanation());
-                        ResponseCode.UNAUTHORIZED.writeResponse(dos, 
portAuthResult.getExplanation());
-                        responseWritten = true;
+            try {
+                switch (property) {
+                    case GZIP: {
+                        useGzip = Boolean.parseBoolean(value);
                         break;
                     }
-                    
-                    if ( !receivedPort.isValid() ) {
-                        logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                        
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
-                        responseWritten = true;
+                    case REQUEST_EXPIRATION_MILLIS:
+                        requestExpirationMillis = Long.parseLong(value);
                         break;
-                    }
-                    
-                    if ( !receivedPort.isRunning() ) {
-                        logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                        
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
-                        responseWritten = true;
+                    case BATCH_COUNT:
+                        requestedBatchCount = Integer.parseInt(value);
+                        if ( requestedBatchCount < 0 ) {
+                            throw new HandshakeException("Cannot request Batch 
Count less than 1; requested value: " + value);
+                        }
                         break;
-                    }
-                    
-                    // PORTS_DESTINATION_FULL was introduced in version 2. If 
version 1, just ignore this
-                    // we we will simply not service the request but the 
sender will timeout
-                    if ( getVersionNegotiator().getVersion() > 1 ) {
-                        for ( final Connection connection : 
port.getConnections() ) {
-                            if ( connection.getFlowFileQueue().isFull() ) {
-                                logger.debug("Responding with ResponseCode 
PORTS_DESTINATION_FULL for {}", receivedPort);
-                                
ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
-                                responseWritten = true;
-                                break;
+                    case BATCH_SIZE:
+                        requestedBatchBytes = Long.parseLong(value);
+                        if ( requestedBatchBytes < 0 ) {
+                            throw new HandshakeException("Cannot request Batch 
Size less than 1; requested value: " + value);
+                        }
+                        break;
+                    case BATCH_DURATION:
+                        requestedBatchNanos = 
TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
+                        if ( requestedBatchNanos < 0 ) {
+                            throw new HandshakeException("Cannot request Batch 
Duration less than 1; requested value: " + value);
+                        }
+                        break;
+                    case PORT_IDENTIFIER: {
+                        Port receivedPort = rootGroup.getInputPort(value);
+                        if ( receivedPort == null ) {
+                            receivedPort = rootGroup.getOutputPort(value);
+                        }
+                        if ( receivedPort == null ) {
+                            logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
+                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                            throw new HandshakeException("Received unknown 
port identifier: " + value);
+                        }
+                        if ( !(receivedPort instanceof RootGroupPort) ) {
+                            logger.debug("Responding with ResponseCode 
UNKNOWN_PORT for identifier {}", value);
+                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                            throw new HandshakeException("Received port 
identifier " + value + ", but this Port is not a RootGroupPort");
+                        }
+                        
+                        this.port = (RootGroupPort) receivedPort;
+                        final PortAuthorizationResult portAuthResult = 
this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+                        if ( !portAuthResult.isAuthorized() ) {
+                            logger.debug("Responding with ResponseCode 
UNAUTHORIZED: ", portAuthResult.getExplanation());
+                            ResponseCode.UNAUTHORIZED.writeResponse(dos, 
portAuthResult.getExplanation());
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        if ( !receivedPort.isValid() ) {
+                            logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                            
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        if ( !receivedPort.isRunning() ) {
+                            logger.debug("Responding with ResponseCode 
PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                            
ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        // PORTS_DESTINATION_FULL was introduced in version 2. 
If version 1, just ignore this
+                        // we we will simply not service the request but the 
sender will timeout
+                        if ( getVersionNegotiator().getVersion() > 1 ) {
+                            for ( final Connection connection : 
port.getConnections() ) {
+                                if ( connection.getFlowFileQueue().isFull() ) {
+                                    logger.debug("Responding with ResponseCode 
PORTS_DESTINATION_FULL for {}", receivedPort);
+                                    
ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
+                                    responseWritten = true;
+                                    break;
+                                }
                             }
                         }
+                        
+                        break;
                     }
-                    
-                    break;
                 }
+            } catch (final NumberFormatException nfe) {
+                throw new HandshakeException("Received invalid value for 
property '" + property + "'; invalid value: " + value);
             }
         }
         
@@ -205,11 +233,6 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             ResponseCode.MISSING_PROPERTY.writeResponse(dos, 
HandshakeProperty.GZIP.name());
             throw new HandshakeException("Missing Property " + 
HandshakeProperty.GZIP.name());
         }
-        if ( port == null ) {
-            logger.debug("Responding with ResponseCode MISSING_PROPERTY 
because Port Identifier property is missing");
-            ResponseCode.MISSING_PROPERTY.writeResponse(dos, 
HandshakeProperty.PORT_IDENTIFIER.name());
-            throw new HandshakeException("Missing Property " + 
HandshakeProperty.PORT_IDENTIFIER.name());
-        }
         
         // send "OK" response
         if ( !responseWritten ) {
@@ -244,6 +267,10 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
         final DataInputStream dis = new 
DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new 
DataOutputStream(commsSession.getOutput().getOutputStream());
         
+        if ( port == null ) {
+               RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot 
transfer FlowFiles because no port was specified");
+        }
+        
         // Negotiate the FlowFileCodec to use.
         try {
             negotiatedFlowFileCodec = 
RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
@@ -306,7 +333,16 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             final CheckedOutputStream checkedOutputStream = new 
CheckedOutputStream(flowFileOutputStream, crc);
 
             final StopWatch transferWatch = new StopWatch(true);
-            flowFile = codec.encode(flowFile, session, checkedOutputStream);
+            
+            final FlowFile toSend = flowFile;
+            session.read(flowFile, new InputStreamCallback() {
+                               @Override
+                               public void process(final InputStream in) 
throws IOException {
+                                       final DataPacket dataPacket = new 
StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+                                       codec.encode(dataPacket, 
checkedOutputStream);
+                               }
+            });
+            
             final long transmissionMillis = 
transferWatch.getElapsed(TimeUnit.MILLISECONDS);
             
             // need to close the CompressionOutputStream in order to force it 
write out any remaining bytes.
@@ -323,8 +359,25 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             session.getProvenanceReporter().send(flowFile, transitUri, "Remote 
Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
             session.remove(flowFile);
             
+            // determine if we should check for more data on queue.
             final long sendingNanos = System.nanoTime() - startNanos;
-            if ( sendingNanos < BATCH_NANOS ) { 
+            boolean poll = true;
+            if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 
0L ) {
+                poll = false;
+            }
+            if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L 
) {
+                poll = false;
+            }
+            if ( flowFilesSent.size() >= requestedBatchCount && 
requestedBatchCount > 0 ) {
+                poll = false;
+            }
+            
+            if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && 
requestedBatchCount == 0 ) {
+                poll = (sendingNanos < DEFAULT_BATCH_NANOS);
+            }
+            
+            if ( poll ) { 
+                // we've not elapsed the requested sending duration, so get 
more data.
                 flowFile = session.get();
             } else {
                 flowFile = null;
@@ -429,7 +482,11 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
             final InputStream flowFileInputStream = useGzip ? new 
CompressionInputStream(dis) : dis;
             final CheckedInputStream checkedInputStream = new 
CheckedInputStream(flowFileInputStream, crc);
 
-            FlowFile flowFile = codec.decode(checkedInputStream, session);
+            final DataPacket dataPacket = codec.decode(checkedInputStream);
+            FlowFile flowFile = session.create();
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            flowFile = session.putAllAttributes(flowFile, 
dataPacket.getAttributes());
+            
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = 
TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = 
flowFile.getAttribute(CoreAttributes.UUID.key());
@@ -451,6 +508,10 @@ public class SocketFlowFileServerProtocol implements 
ServerProtocol {
                     continueTransaction = false;
                     calculatedCRC = 
String.valueOf(checkedInputStream.getChecksum().getValue());
                     break;
+                case CANCEL_TRANSACTION:
+                    logger.info("{} Received CancelTransaction indicator from 
{} with explanation {}", this, peer, transactionResponse.getMessage());
+                    session.rollback();
+                    return 0;
                 default:
                     throw new ProtocolException("Received unexpected response 
from peer: when expecting Continue Transaction or Finish Transaction, received" 
+ transactionResponse);
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
deleted file mode 100644
index e074010..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import org.apache.nifi.remote.StandardRemoteGroupPort;
-import org.apache.nifi.remote.PeerStatus;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.nifi.cluster.ClusterNodeInformation;
-import org.apache.nifi.cluster.NodeInformation;
-import org.apache.nifi.connectable.ConnectableType;
-
-import org.junit.Test;
-
-public class TestStandardRemoteGroupPort {
-
-    @Test
-    public void testFormulateDestinationListForOutput() throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 
4096));
-        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
10240));
-        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 
1024));
-        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 
4096));
-        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, 
ConnectableType.REMOTE_OUTPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
-        }
-    }
-    
-    @Test
-    public void testFormulateDestinationListForOutputHugeDifference() throws 
IOException {
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 
500));
-        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
50000));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, 
ConnectableType.REMOTE_OUTPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
-        }
-    }
-    
-    
-    
-    
-    @Test
-    public void testFormulateDestinationListForInputPorts() throws IOException 
{
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 
4096));
-        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
10240));
-        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 
1024));
-        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 
4096));
-        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, 
ConnectableType.REMOTE_INPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
-        }
-    }
-    
-    @Test
-    public void testFormulateDestinationListForInputPortsHugeDifference() 
throws IOException {
-        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
-        final List<NodeInformation> collection = new ArrayList<>();
-        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 
500));
-        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
50000));
-
-        clusterNodeInfo.setNodeInformation(collection);
-        final List<PeerStatus> destinations = 
StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, 
ConnectableType.REMOTE_INPUT_PORT);
-        for ( final PeerStatus peerStatus : destinations ) {
-            System.out.println(peerStatus.getHostname() + ":" + 
peerStatus.getPort());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 8eda682..eb395c9 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -624,6 +624,11 @@
                 <artifactId>nifi-utils</artifactId>
                 <version>0.0.2-incubating-SNAPSHOT</version>
             </dependency>
+                       <dependency>
+                               <groupId>org.apache.nifi</groupId>
+                               
<artifactId>nifi-site-to-site-client</artifactId>
+                               <version>0.0.2-incubating-SNAPSHOT</version>
+                       </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-web-utils</artifactId>

Reply via email to