http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
new file mode 100644
index 0000000..6e54d62
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BufferStateManager {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BufferStateManager.class);
+
+    private ByteBuffer buffer;
+    private Direction direction = Direction.WRITE;
+
+    public BufferStateManager(final ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public BufferStateManager(final ByteBuffer buffer, final Direction 
direction) {
+        this.buffer = buffer;
+        this.direction = direction;
+    }
+
+    /**
+     * Ensures that the buffer is at least as big as the size specified, 
resizing the buffer if necessary. This operation MAY change the direction of 
the buffer.
+     *
+     * @param requiredSize the desired size of the buffer
+     */
+    public void ensureSize(final int requiredSize) {
+        if (buffer.capacity() < requiredSize) {
+            final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
+
+            // we have to read buffer so make sure the direction is correct.
+            if (direction == Direction.WRITE) {
+                buffer.flip();
+            }
+
+            // Copy from buffer to newBuffer
+            newBuffer.put(buffer);
+
+            // Swap the buffers
+            buffer = newBuffer;
+
+            // the new buffer is ready to be written to
+            direction = Direction.WRITE;
+        }
+    }
+
+    public ByteBuffer prepareForWrite(final int requiredSize) {
+        ensureSize(requiredSize);
+
+        if (direction == Direction.READ) {
+            direction = Direction.WRITE;
+            buffer.position(buffer.limit());
+        }
+
+        buffer.limit(buffer.capacity());
+        return buffer;
+    }
+
+    public ByteBuffer prepareForRead(final int requiredSize) {
+        ensureSize(requiredSize);
+
+        if (direction == Direction.WRITE) {
+            direction = Direction.READ;
+            buffer.flip();
+        }
+
+        return buffer;
+    }
+
+    /**
+     * Clears the contents of the buffer and sets direction to WRITE
+     */
+    public void clear() {
+        logger.debug("Clearing {}", buffer);
+        buffer.clear();
+        direction = Direction.WRITE;
+    }
+
+    public void compact() {
+        final String before = buffer.toString();
+        buffer.compact();
+        logger.debug("Before compact: {}, after: {}", before, buffer);
+        direction = Direction.WRITE;
+    }
+
+    public static enum Direction {
+
+        READ, WRITE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
new file mode 100644
index 0000000..1ec229d
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelInputStream extends InputStream {
+
+    private static final long CHANNEL_EMPTY_WAIT_NANOS = 
TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SocketChannel channel;
+    private volatile int timeoutMillis = 30000;
+
+    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+    private Byte bufferedByte = null;
+
+    public SocketChannelInputStream(final SocketChannel socketChannel) throws 
IOException {
+        // this class expects a non-blocking channel
+        socketChannel.configureBlocking(false);
+        this.channel = socketChannel;
+    }
+
+    public void setTimeout(final int timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+    public void consume() throws IOException {
+        channel.shutdownInput();
+
+        final byte[] b = new byte[4096];
+        final ByteBuffer buffer = ByteBuffer.wrap(b);
+        int bytesRead;
+        do {
+            bytesRead = channel.read(buffer);
+            buffer.flip();
+        } while (bytesRead > 0);
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bufferedByte != null) {
+            final int retVal = bufferedByte & 0xFF;
+            bufferedByte = null;
+            return retVal;
+        }
+
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(oneByteBuffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from 
socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt 
status
+                    throw new ClosedByInterruptException(); // simulate an 
interrupted blocked read operation
+                }
+            }
+        } while (bytesRead == 0);
+
+        if (bytesRead == -1) {
+            return -1;
+        }
+        oneByteBuffer.flip();
+        return oneByteBuffer.get() & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        if (bufferedByte != null) {
+            final byte retVal = bufferedByte;
+            bufferedByte = null;
+            b[off] = retVal;
+            return 1;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(buffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from 
socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt 
status
+                    throw new ClosedByInterruptException(); // simulate an 
interrupted blocked read operation
+                }
+            }
+        } while (bytesRead == 0);
+
+        return bytesRead;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (bufferedByte != null) {
+            return 1;
+        }
+
+        isDataAvailable(); // attempt to read from socket
+        return (bufferedByte == null) ? 0 : 1;
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        if (bufferedByte != null) {
+            return true;
+        }
+
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+        final int bytesRead = channel.read(oneByteBuffer);
+        if (bytesRead == -1) {
+            throw new EOFException("Peer has closed the stream");
+        }
+        if (bytesRead > 0) {
+            oneByteBuffer.flip();
+            bufferedByte = oneByteBuffer.get();
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Closes the underlying socket channel.
+     *
+     * @throws java.io.IOException for issues closing underlying stream
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
new file mode 100644
index 0000000..a56d9dd
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelOutputStream extends OutputStream {
+
+    private static final long CHANNEL_FULL_WAIT_NANOS = 
TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SocketChannel channel;
+    private volatile int timeout = 30000;
+
+    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+
+    public SocketChannelOutputStream(final SocketChannel socketChannel) throws 
IOException {
+        // this class expects a non-blocking channel
+        socketChannel.configureBlocking(false);
+        this.channel = socketChannel;
+    }
+
+    public void setTimeout(final int timeoutMillis) {
+        this.timeout = timeoutMillis;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+        oneByteBuffer.put((byte) b);
+        oneByteBuffer.flip();
+
+        final int timeoutMillis = this.timeout;
+        long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesWritten;
+        while (oneByteBuffer.hasRemaining()) {
+            bytesWritten = channel.write(oneByteBuffer);
+            if (bytesWritten == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out writing to 
socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt 
status
+                    throw new ClosedByInterruptException(); // simulate an 
interrupted blocked write operation
+                }
+            } else {
+                return;
+            }
+        }
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws 
IOException {
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+        final int timeoutMillis = this.timeout;
+        long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesWritten;
+        while (buffer.hasRemaining()) {
+            bytesWritten = channel.write(buffer);
+            if (bytesWritten == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out writing to 
socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt 
status
+                    throw new ClosedByInterruptException(); // simulate an 
interrupted blocked write operation
+                }
+            } else {
+                maxTime = System.currentTimeMillis() + timeoutMillis;
+            }
+        }
+    }
+
+    /**
+     * Closes the underlying SocketChannel
+     *
+     * @throws java.io.IOException if issues closing underlying stream
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..1f23d79
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+    public static final int MAX_WRITE_SIZE = 65536;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SSLSocketChannel.class);
+    private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = 
TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+    private final String hostname;
+    private final int port;
+    private final SSLEngine engine;
+    private final SocketAddress socketAddress;
+
+    private BufferStateManager streamInManager;
+    private BufferStateManager streamOutManager;
+    private BufferStateManager appDataManager;
+
+    private SocketChannel channel;
+
+    private final byte[] oneByteBuffer = new byte[1];
+
+    private int timeoutMillis = 30000;
+    private volatile boolean connected = false;
+    private boolean handshaking = false;
+    private boolean closed = false;
+    private volatile boolean interrupted = false;
+
+    public SSLSocketChannel(final SSLContext sslContext, final String 
hostname, final int port, final boolean client) throws IOException {
+        this.socketAddress = new InetSocketAddress(hostname, port);
+        this.channel = SocketChannel.open();
+        this.hostname = hostname;
+        this.port = port;
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public SSLSocketChannel(final SSLContext sslContext, final SocketChannel 
socketChannel, final boolean client) throws IOException {
+        if (!socketChannel.isConnected()) {
+            throw new IllegalArgumentException("Cannot pass an un-connected 
SocketChannel");
+        }
+
+        this.channel = socketChannel;
+
+        this.socketAddress = socketChannel.getRemoteAddress();
+        final Socket socket = socketChannel.socket();
+        this.hostname = socket.getInetAddress().getHostName();
+        this.port = socket.getPort();
+
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public void setTimeout(final int millis) {
+        this.timeoutMillis = millis;
+    }
+
+    public int getTimeout() {
+        return timeoutMillis;
+    }
+
+    public void connect() throws SSLHandshakeException, IOException {
+        try {
+            channel.configureBlocking(false);
+            if (!channel.isConnected()) {
+                final long startTime = System.currentTimeMillis();
+
+                if (!channel.connect(socketAddress)) {
+                    while (!channel.finishConnect()) {
+                        if (interrupted) {
+                            throw new TransmissionDisabledException();
+                        }
+                        if (System.currentTimeMillis() > startTime + 
timeoutMillis) {
+                            throw new SocketTimeoutException("Timed out 
connecting to " + hostname + ":" + port);
+                        }
+
+                        try {
+                            Thread.sleep(50L);
+                        } catch (final InterruptedException e) {
+                        }
+                    }
+                }
+            }
+            engine.beginHandshake();
+
+            performHandshake();
+            logger.debug("{} Successfully completed SSL handshake", this);
+
+            streamInManager.clear();
+            streamOutManager.clear();
+            appDataManager.clear();
+
+            connected = true;
+        } catch (final Exception e) {
+            logger.error("{} Failed to connect due to {}", this, e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            closeQuietly(channel);
+            engine.closeInbound();
+            engine.closeOutbound();
+            throw e;
+        }
+    }
+
+    public String getDn() throws CertificateExpiredException, 
CertificateNotYetValidException, SSLPeerUnverifiedException {
+        final X509Certificate[] certs = 
engine.getSession().getPeerCertificateChain();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
+        }
+
+        final X509Certificate cert = certs[0];
+        cert.checkValidity();
+        return cert.getSubjectDN().getName().trim();
+    }
+
+    private void performHandshake() throws IOException {
+        // Generate handshake message
+        final byte[] emptyMessage = new byte[0];
+        handshaking = true;
+        logger.debug("{} Performing Handshake", this);
+
+        try {
+            while (true) {
+                switch (engine.getHandshakeStatus()) {
+                    case FINISHED:
+                        return;
+                    case NEED_WRAP: {
+                        final ByteBuffer appDataOut = 
ByteBuffer.wrap(emptyMessage);
+
+                        final ByteBuffer outboundBuffer = 
streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        final SSLEngineResult wrapHelloResult = 
engine.wrap(appDataOut, outboundBuffer);
+                        if (wrapHelloResult.getStatus() == 
Status.BUFFER_OVERFLOW) {
+                            
streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+                            continue;
+                        }
+
+                        if (wrapHelloResult.getStatus() != Status.OK) {
+                            throw new SSLHandshakeException("Could not 
generate SSL Handshake information: SSLEngineResult: "
+                                    + wrapHelloResult.toString());
+                        }
+
+                        logger.trace("{} Handshake response after wrapping: 
{}", this, wrapHelloResult);
+
+                        final ByteBuffer readableStreamOut = 
streamOutManager.prepareForRead(1);
+                        final int bytesToSend = readableStreamOut.remaining();
+                        writeFully(readableStreamOut);
+                        logger.trace("{} Sent {} bytes of wrapped data for 
handshake", this, bytesToSend);
+
+                        streamOutManager.clear();
+                    }
+                    continue;
+                    case NEED_UNWRAP: {
+                        final ByteBuffer readableDataIn = 
streamInManager.prepareForRead(0);
+                        final ByteBuffer appData = 
appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        // Read handshake response from other side
+                        logger.trace("{} Unwrapping: {} to {}", new 
Object[]{this, readableDataIn, appData});
+                        SSLEngineResult handshakeResponseResult = 
engine.unwrap(readableDataIn, appData);
+                        logger.trace("{} Handshake response after unwrapping: 
{}", this, handshakeResponseResult);
+
+                        if (handshakeResponseResult.getStatus() == 
Status.BUFFER_UNDERFLOW) {
+                            final ByteBuffer writableDataIn = 
streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                            final int bytesRead = readData(writableDataIn);
+                            if (bytesRead > 0) {
+                                logger.trace("{} Read {} bytes for handshake", 
this, bytesRead);
+                            }
+
+                            if (bytesRead < 0) {
+                                throw new SSLHandshakeException("Reached 
End-of-File marker while performing handshake");
+                            }
+                        } else if (handshakeResponseResult.getStatus() == 
Status.CLOSED) {
+                            throw new IOException("Channel was closed by peer 
during handshake");
+                        } else {
+                            streamInManager.compact();
+                            appDataManager.clear();
+                        }
+                    }
+                    break;
+                    case NEED_TASK:
+                        performTasks();
+                        continue;
+                    case NOT_HANDSHAKING:
+                        return;
+                }
+            }
+        } finally {
+            handshaking = false;
+        }
+    }
+
+    private void performTasks() {
+        Runnable runnable;
+        while ((runnable = engine.getDelegatedTask()) != null) {
+            runnable.run();
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            closeable.close();
+        } catch (final Exception e) {
+        }
+    }
+
+    public void consume() throws IOException {
+        channel.shutdownInput();
+
+        final byte[] b = new byte[4096];
+        final ByteBuffer buffer = ByteBuffer.wrap(b);
+        int readCount;
+        do {
+            readCount = channel.read(buffer);
+            buffer.flip();
+        } while (readCount > 0);
+    }
+
+    private int readData(final ByteBuffer dest) throws IOException {
+        final long startTime = System.currentTimeMillis();
+
+        while (true) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            if (dest.remaining() == 0) {
+                return 0;
+            }
+
+            final int readCount = channel.read(dest);
+
+            if (readCount == 0) {
+                if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out reading from 
socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt 
status
+                    throw new ClosedByInterruptException();
+                }
+
+                continue;
+            }
+
+            logger.trace("{} Read {} bytes", this, readCount);
+            return readCount;
+        }
+    }
+
+    private Status encryptAndWriteFully(final BufferStateManager src) throws 
IOException {
+        SSLEngineResult result = null;
+
+        final ByteBuffer buff = src.prepareForRead(0);
+        final ByteBuffer outBuff = 
streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+        logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+        while (buff.remaining() > 0) {
+            result = engine.wrap(buff, outBuff);
+            if (result.getStatus() == Status.OK) {
+                final ByteBuffer readableOutBuff = 
streamOutManager.prepareForRead(0);
+                writeFully(readableOutBuff);
+                streamOutManager.clear();
+            } else {
+                return result.getStatus();
+            }
+        }
+
+        return result.getStatus();
+    }
+
+    private void writeFully(final ByteBuffer src) throws IOException {
+        long lastByteWrittenTime = System.currentTimeMillis();
+
+        int bytesWritten = 0;
+        while (src.hasRemaining()) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            final int written = channel.write(src);
+            bytesWritten += written;
+            final long now = System.currentTimeMillis();
+            if (written > 0) {
+                lastByteWrittenTime = now;
+            } else {
+                if (now > lastByteWrittenTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out writing to 
socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (final InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt 
status
+                    throw new ClosedByInterruptException();
+                }
+            }
+        }
+
+        logger.trace("{} Wrote {} bytes", this, bytesWritten);
+    }
+
+    public boolean isClosed() {
+        if (closed) {
+            return true;
+        }
+        // need to detect if peer has sent closure handshake...if so the 
answer is true
+        final ByteBuffer writableInBuffer = 
streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        int readCount = 0;
+        try {
+            readCount = channel.read(writableInBuffer);
+        } catch (IOException e) {
+            logger.error("{} Failed to readData due to {}", new Object[]{this, 
e});
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            readCount = -1; // treat the condition same as if End of Stream
+        }
+        if (readCount == 0) {
+            return false;
+        }
+        if (readCount > 0) {
+            logger.trace("{} Read {} bytes", this, readCount);
+
+            final ByteBuffer streamInBuffer = 
streamInManager.prepareForRead(1);
+            final ByteBuffer appDataBuffer = 
appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            try {
+                SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, 
appDataBuffer);
+                logger.trace("{} When checking if closed, (handshake={}) 
Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+                if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+                    // Drain the incoming TCP buffer
+                    final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+                    int bytesDiscarded = channel.read(discardBuffer);
+                    while (bytesDiscarded > 0) {
+                        discardBuffer.clear();
+                        bytesDiscarded = channel.read(discardBuffer);
+                    }
+                    engine.closeInbound();
+                } else {
+                    streamInManager.compact();
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.error("{} Failed to check if closed due to {}. Closing 
channel.", new Object[]{this, e});
+                if (logger.isDebugEnabled()) {
+                    logger.error("", e);
+                }
+            }
+        }
+        // either readCount is -1, indicating an end of stream, or the peer 
sent a closure handshake
+        // so go ahead and close down the channel
+        closeQuietly(channel.socket());
+        closeQuietly(channel);
+        closed = true;
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("{} Closing Connection", this);
+        if (channel == null) {
+            return;
+        }
+
+        if (closed) {
+            return;
+        }
+
+        try {
+            engine.closeOutbound();
+
+            final byte[] emptyMessage = new byte[0];
+
+            final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+            final ByteBuffer outboundBuffer = 
streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            final SSLEngineResult handshakeResult = engine.wrap(appDataOut, 
outboundBuffer);
+
+            if (handshakeResult.getStatus() != Status.CLOSED) {
+                throw new IOException("Invalid close state - will not send 
network data");
+            }
+
+            final ByteBuffer readableStreamOut = 
streamOutManager.prepareForRead(1);
+            writeFully(readableStreamOut);
+        } finally {
+            // Drain the incoming TCP buffer
+            final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+            try {
+                int bytesDiscarded = channel.read(discardBuffer);
+                while (bytesDiscarded > 0) {
+                    discardBuffer.clear();
+                    bytesDiscarded = channel.read(discardBuffer);
+                }
+            } catch (Exception e) {
+            }
+
+            closeQuietly(channel.socket());
+            closeQuietly(channel);
+            closed = true;
+        }
+    }
+
+    private int copyFromAppDataBuffer(final byte[] buffer, final int offset, 
final int len) {
+        // If any data already exists in the application data buffer, copy it 
to the buffer.
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+        final int appDataRemaining = appDataBuffer.remaining();
+        if (appDataRemaining > 0) {
+            final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+            appDataBuffer.get(buffer, offset, bytesToCopy);
+
+            final int bytesCopied = appDataRemaining - 
appDataBuffer.remaining();
+            logger.trace("{} Copied {} ({}) bytes from unencrypted application 
buffer to user space",
+                    new Object[]{this, bytesToCopy, bytesCopied});
+            return bytesCopied;
+        }
+        return 0;
+    }
+
+    public int available() throws IOException {
+        ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+        final int buffered = appDataBuffer.remaining() + 
streamDataBuffer.remaining();
+        if (buffered > 0) {
+            return buffered;
+        }
+
+        final boolean wasAbleToRead = isDataAvailable();
+        if (!wasAbleToRead) {
+            return 0;
+        }
+
+        appDataBuffer = appDataManager.prepareForRead(1);
+        streamDataBuffer = streamInManager.prepareForRead(1);
+        return appDataBuffer.remaining() + streamDataBuffer.remaining();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+        if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) 
{
+            return true;
+        }
+
+        final ByteBuffer writableBuffer = 
streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        final int bytesRead = channel.read(writableBuffer);
+        return (bytesRead > 0);
+    }
+
+    public int read() throws IOException {
+        final int bytesRead = read(oneByteBuffer);
+        if (bytesRead == -1) {
+            return -1;
+        }
+        return oneByteBuffer[0] & 0xFF;
+    }
+
+    public int read(final byte[] buffer) throws IOException {
+        return read(buffer, 0, buffer.length);
+    }
+
+    public int read(final byte[] buffer, final int offset, final int len) 
throws IOException {
+        logger.debug("{} Reading up to {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int copied = copyFromAppDataBuffer(buffer, offset, len);
+        if (copied > 0) {
+            return copied;
+        }
+
+        appDataManager.clear();
+
+        while (true) {
+            // prepare buffers and call unwrap
+            final ByteBuffer streamInBuffer = 
streamInManager.prepareForRead(1);
+            SSLEngineResult unwrapResponse = null;
+            final ByteBuffer appDataBuffer = 
appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+            logger.trace("{} When reading data, (handshake={}) Unwrap 
response: {}", new Object[]{this, handshaking, unwrapResponse});
+
+            switch (unwrapResponse.getStatus()) {
+                case BUFFER_OVERFLOW:
+                    throw new SSLHandshakeException("Buffer Overflow, which is 
not allowed to happen from an unwrap");
+                case BUFFER_UNDERFLOW: {
+//                
appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+                    final ByteBuffer writableInBuffer = 
streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                    final int bytesRead = readData(writableInBuffer);
+                    if (bytesRead < 0) {
+                        return -1;
+                    }
+
+                    continue;
+                }
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case OK: {
+                    copied = copyFromAppDataBuffer(buffer, offset, len);
+                    if (copied == 0) {
+                        throw new IOException("Failed to decrypt data");
+                    }
+                    streamInManager.compact();
+                    return copied;
+                }
+            }
+        }
+    }
+
+    public void write(final int data) throws IOException {
+        write(new byte[]{(byte) data}, 0, 1);
+    }
+
+    public void write(final byte[] data) throws IOException {
+        write(data, 0, data.length);
+    }
+
+    public void write(final byte[] data, final int offset, final int len) 
throws IOException {
+        logger.debug("{} Writing {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int iterations = len / MAX_WRITE_SIZE;
+        if (len % MAX_WRITE_SIZE > 0) {
+            iterations++;
+        }
+
+        for (int i = 0; i < iterations; i++) {
+            streamOutManager.clear();
+            final int itrOffset = offset + i * MAX_WRITE_SIZE;
+            final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, 
itrLen);
+
+            final BufferStateManager buffMan = new 
BufferStateManager(byteBuffer, Direction.READ);
+            final Status status = encryptAndWriteFully(buffMan);
+            switch (status) {
+                case BUFFER_OVERFLOW:
+                    
streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+                    
appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+                    continue;
+                case OK:
+                    continue;
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case BUFFER_UNDERFLOW:
+                    throw new AssertionError("Got Buffer Underflow but should 
not have...");
+            }
+        }
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..ca6de85
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    public void consume() throws IOException {
+        channel.consume();
+    }
+
+    @Override
+    public int read() throws IOException {
+        return channel.read();
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return channel.read(b);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        return channel.read(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which will also close the 
OutputStream and connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+
+    @Override
+    public int available() throws IOException {
+        return channel.available();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        return available() > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..262cf54
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        channel.write(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which also will close the 
InputStream and the connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
new file mode 100644
index 0000000..2afaa70
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.InputStream;
+
+/**
+ * This class is a slight modification of the BufferedInputStream in the 
java.io package. The modification is that this implementation does not provide 
synchronization on method calls, which means
+ * that this class is not suitable for use by multiple threads. However, the 
absence of these synchronized blocks results in potentially much better 
performance.
+ */
+public class BufferedInputStream extends java.io.BufferedInputStream {
+
+    public BufferedInputStream(final InputStream in) {
+        super(in);
+    }
+
+    public BufferedInputStream(final InputStream in, final int size) {
+        super(in, size);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
new file mode 100644
index 0000000..dc56927
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * This class is a slight modification of the {@link 
java.io.BufferedOutputStream} class. This implementation differs in that it 
does not mark methods as synchronized. This means that this class is
+ * not suitable for writing by multiple concurrent threads. However, the 
removal of the synchronized keyword results in potentially much better 
performance.
+ */
+public class BufferedOutputStream extends FilterOutputStream {
+
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer. This value is always in the 
range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
+     */
+    protected int count;
+
+    /**
+     * Creates a new buffered output stream to write data to the specified 
underlying output stream.
+     *
+     * @param out the underlying output stream.
+     */
+    public BufferedOutputStream(OutputStream out) {
+        this(out, 8192);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified 
underlying output stream with the specified buffer size.
+     *
+     * @param out the underlying output stream.
+     * @param size the buffer size.
+     * @exception IllegalArgumentException if size &lt;= 0.
+     */
+    public BufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Flush the internal buffer
+     */
+    private void flushBuffer() throws IOException {
+        if (count > 0) {
+            out.write(buf, 0, count);
+            count = 0;
+        }
+    }
+
+    /**
+     * Writes the specified byte to this buffered output stream.
+     *
+     * @param b the byte to be written.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void write(int b) throws IOException {
+        if (count >= buf.length) {
+            flushBuffer();
+        }
+        buf[count++] = (byte) b;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at 
offset <code>off</code> to this buffered output stream.
+     *
+     * <p>
+     * Ordinarily this method stores bytes from the given array into this 
stream's buffer, flushing the buffer to the underlying output stream as needed. 
If the requested length is at least as large
+     * as this stream's buffer, however, then this method will flush the 
buffer and write the bytes directly to the underlying output stream. Thus 
redundant <code>BufferedOutputStream</code>s will not
+     * copy data unnecessarily.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if (len >= buf.length) {
+            /* If the request length exceeds the size of the output buffer,
+             flush the output buffer and then write the data directly.
+             In this way buffered streams will cascade harmlessly. */
+            flushBuffer();
+            out.write(b, off, len);
+            return;
+        }
+        if (len >= buf.length - count) {
+            flushBuffer();
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    /**
+     * Flushes this buffered output stream. This forces any buffered output 
bytes to be written out to the underlying output stream.
+     *
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
new file mode 100644
index 0000000..85c8c4f
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.InputStream;
+
+/**
+ * This class performs the same function as java.io.ByteArrayInputStream but 
does not mark its methods as synchronized
+ */
+public class ByteArrayInputStream extends InputStream {
+
+    /**
+     * An array of bytes that was provided by the creator of the stream. 
Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the only 
bytes that can ever be read from the stream;
+     * element <code>buf[pos]</code> is the next byte to be read.
+     */
+    protected byte buf[];
+
+    /**
+     * The index of the next character to read from the input stream buffer. 
This value should always be nonnegative and not larger than the value of 
<code>count</code>. The next byte to be read from
+     * the input stream buffer will be <code>buf[pos]</code>.
+     */
+    protected int pos;
+
+    /**
+     * The currently marked position in the stream. ByteArrayInputStream 
objects are marked at position zero by default when constructed. They may be 
marked at another position within the buffer by
+     * the <code>mark()</code> method. The current buffer position is set to 
this point by the <code>reset()</code> method.
+     * <p>
+     * If no mark has been set, then the value of mark is the offset passed to 
the constructor (or 0 if the offset was not supplied).
+     *
+     * @since JDK1.1
+     */
+    protected int mark = 0;
+
+    /**
+     * The index one greater than the last valid character in the input stream 
buffer. This value should always be nonnegative and not larger than the length 
of <code>buf</code>. It is one greater
+     * than the position of the last byte within <code>buf</code> that can 
ever be read from the input stream buffer.
+     */
+    protected int count;
+
+    /**
+     * Creates a <code>ByteArrayInputStream</code> so that it uses 
<code>buf</code> as its buffer array. The buffer array is not copied. The 
initial value of <code>pos</code> is <code>0</code> and the
+     * initial value of  <code>count</code> is the length of <code>buf</code>.
+     *
+     * @param buf the input buffer.
+     */
+    public ByteArrayInputStream(byte buf[]) {
+        this.buf = buf;
+        this.pos = 0;
+        this.count = buf.length;
+    }
+
+    /**
+     * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as 
its buffer array. The initial value of <code>pos</code> is <code>offset</code> 
and the initial value of
+     * <code>count</code> is the minimum of <code>offset+length</code> and 
<code>buf.length</code>. The buffer array is not copied. The buffer's mark is 
set to the specified offset.
+     *
+     * @param buf the input buffer.
+     * @param offset the offset in the buffer of the first byte to read.
+     * @param length the maximum number of bytes to read from the buffer.
+     */
+    public ByteArrayInputStream(byte buf[], int offset, int length) {
+        this.buf = buf;
+        this.pos = offset;
+        this.count = Math.min(offset + length, buf.length);
+        this.mark = offset;
+    }
+
+    /**
+     * Reads the next byte of data from this input stream. The value byte is 
returned as an <code>int</code> in the range <code>0</code> to 
<code>255</code>. If no byte is available because the end of
+     * the stream has been reached, the value <code>-1</code> is returned.
+     * <p>
+     * This <code>read</code> method cannot block.
+     *
+     * @return the next byte of data, or <code>-1</code> if the end of the 
stream has been reached.
+     */
+    @Override
+    public int read() {
+        return (pos < count) ? (buf[pos++] & 0xff) : -1;
+    }
+
+    /**
+     * Reads up to <code>len</code> bytes of data into an array of bytes from 
this input stream. If <code>pos</code> equals <code>count</code>, then 
<code>-1</code> is returned to indicate end of
+     * file. Otherwise, the number <code>k</code> of bytes read is equal to 
the smaller of <code>len</code> and <code>count-pos</code>. If <code>k</code> 
is positive, then bytes <code>buf[pos]</code>
+     * through <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> 
through <code>b[off+k-1]</code> in the manner performed by 
<code>System.arraycopy</code>. The value <code>k</code> is added
+     * into <code>pos</code> and <code>k</code> is returned.
+     * <p>
+     * This <code>read</code> method cannot block.
+     *
+     * @param b the buffer into which the data is read.
+     * @param off the start offset in the destination array <code>b</code>
+     * @param len the maximum number of bytes read.
+     * @return the total number of bytes read into the buffer, or 
<code>-1</code> if there is no more data because the end of the stream has been 
reached.
+     * @exception NullPointerException If <code>b</code> is <code>null</code>.
+     * @exception IndexOutOfBoundsException If <code>off</code> is negative, 
<code>len</code> is negative, or <code>len</code> is greater than 
<code>b.length - off</code>
+     */
+    @Override
+    public int read(byte b[], int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        if (pos >= count) {
+            return -1;
+        }
+
+        int avail = count - pos;
+        if (len > avail) {
+            len = avail;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        System.arraycopy(buf, pos, b, off, len);
+        pos += len;
+        return len;
+    }
+
+    /**
+     * Skips <code>n</code> bytes of input from this input stream. Fewer bytes 
might be skipped if the end of the input stream is reached. The actual number 
<code>k</code> of bytes to be skipped is
+     * equal to the smaller of <code>n</code> and  <code>count-pos</code>. The 
value <code>k</code> is added into <code>pos</code> and <code>k</code> is 
returned.
+     *
+     * @param n the number of bytes to be skipped.
+     * @return the actual number of bytes skipped.
+     */
+    @Override
+    public long skip(long n) {
+        long k = count - pos;
+        if (n < k) {
+            k = n < 0 ? 0 : n;
+        }
+
+        pos += k;
+        return k;
+    }
+
+    /**
+     * Returns the number of remaining bytes that can be read (or skipped 
over) from this input stream.
+     * <p>
+     * The value returned is <code>count&nbsp;- pos</code>, which is the 
number of bytes remaining to be read from the input buffer.
+     *
+     * @return the number of remaining bytes that can be read (or skipped 
over) from this input stream without blocking.
+     */
+    @Override
+    public int available() {
+        return count - pos;
+    }
+
+    /**
+     * Tests if this <code>InputStream</code> supports mark/reset. The 
<code>markSupported</code> method of <code>ByteArrayInputStream</code> always 
returns <code>true</code>.
+     *
+     * @since JDK1.1
+     */
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    /**
+     * Set the current marked position in the stream. ByteArrayInputStream 
objects are marked at position zero by default when constructed. They may be 
marked at another position within the buffer by
+     * this method.
+     * <p>
+     * If no mark has been set, then the value of the mark is the offset 
passed to the constructor (or 0 if the offset was not supplied).
+     *
+     * <p>
+     * Note: The <code>readAheadLimit</code> for this class has no meaning.
+     *
+     * @since JDK1.1
+     */
+    @Override
+    public void mark(int readAheadLimit) {
+        mark = pos;
+    }
+
+    /**
+     * Resets the buffer to the marked position. The marked position is 0 
unless another position was marked or an offset was specified in the 
constructor.
+     */
+    @Override
+    public void reset() {
+        pos = mark;
+    }
+
+    /**
+     * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in 
this class can be called after the stream has been closed without generating an 
<tt>IOException</tt>.
+     * <p>
+     */
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
new file mode 100644
index 0000000..aade199
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ * This class provides a more efficient implementation of the 
java.io.ByteArrayOutputStream. The efficiency is gained in two ways:
+ * <ul>
+ * <li>The write methods are not synchronized</li>
+ * <li>The class provides {@link #getUnderlyingBuffer()} and {@link 
#getBufferLength()}, which can be used to access the underlying byte array 
directly, rather than the System.arraycopy that
+ * {@link #toByteArray()} uses
+ * </ul>
+ *
+ */
+public class ByteArrayOutputStream extends OutputStream {
+
+    /**
+     * The buffer where data is stored.
+     */
+    protected byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer.
+     */
+    protected int count;
+
+    /**
+     * Creates a new byte array output stream. The buffer capacity is 
initially 32 bytes, though its size increases if necessary.
+     */
+    public ByteArrayOutputStream() {
+        this(32);
+    }
+
+    /**
+     * Creates a new byte array output stream, with a buffer capacity of the 
specified size, in bytes.
+     *
+     * @param size the initial size.
+     * @exception IllegalArgumentException if size is negative.
+     */
+    public ByteArrayOutputStream(int size) {
+        if (size < 0) {
+            throw new IllegalArgumentException("Negative initial size: "
+                    + size);
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Increases the capacity if necessary to ensure that it can hold at least 
the number of elements specified by the minimum capacity argument.
+     *
+     * @param minCapacity the desired minimum capacity
+     * @throws OutOfMemoryError if {@code minCapacity < 0}. This is 
interpreted as a request for the unsatisfiably large capacity {@code (long) 
Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
+     */
+    private void ensureCapacity(int minCapacity) {
+        // overflow-conscious code
+        if (minCapacity - buf.length > 0) {
+            grow(minCapacity);
+        }
+    }
+
+    /**
+     * Increases the capacity to ensure that it can hold at least the number 
of elements specified by the minimum capacity argument.
+     *
+     * @param minCapacity the desired minimum capacity
+     */
+    private void grow(int minCapacity) {
+        // overflow-conscious code
+        int oldCapacity = buf.length;
+        int newCapacity = oldCapacity << 1;
+        if (newCapacity - minCapacity < 0) {
+            newCapacity = minCapacity;
+        }
+        if (newCapacity < 0) {
+            if (minCapacity < 0) {  // overflow
+                throw new OutOfMemoryError();
+            }
+            newCapacity = Integer.MAX_VALUE;
+        }
+        buf = Arrays.copyOf(buf, newCapacity);
+    }
+
+    /**
+     * Writes the specified byte to this byte array output stream.
+     *
+     * @param b the byte to be written.
+     */
+    @Override
+    public void write(int b) {
+        ensureCapacity(count + 1);
+        buf[count] = (byte) b;
+        count += 1;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at 
offset <code>off</code> to this byte array output stream.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     */
+    @Override
+    public void write(byte b[], int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0)
+                || ((off + len) - b.length > 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        ensureCapacity(count + len);
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    /**
+     * Writes the complete contents of this byte array output stream to the 
specified output stream argument, as if by calling the output stream's write 
method using
+     * <code>out.write(buf, 0, count)</code>.
+     *
+     * @param out the output stream to which to write the data.
+     * @exception IOException if an I/O error occurs.
+     */
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(buf, 0, count);
+    }
+
+    /**
+     * Resets the <code>count</code> field of this byte array output stream to 
zero, so that all currently accumulated output in the output stream is 
discarded. The output stream can be used again,
+     * reusing the already allocated buffer space.
+     *
+     * @see java.io.ByteArrayInputStream#count
+     */
+    public void reset() {
+        count = 0;
+    }
+
+    /**
+     * Creates a newly allocated byte array. Its size is the current size of 
this output stream and the valid contents of the buffer have been copied into 
it.
+     *
+     * @return the current contents of this output stream, as a byte array.
+     * @see java.io.ByteArrayOutputStream#size()
+     */
+    public byte[] toByteArray() {
+        return Arrays.copyOf(buf, count);
+    }
+
+    /**
+     * Returns the current size of the buffer.
+     *
+     * @return the value of the <code>count</code> field, which is the number 
of valid bytes in this output stream.
+     * @see java.io.ByteArrayOutputStream#count
+     */
+    public int size() {
+        return count;
+    }
+
+    /**
+     * Converts the buffer's contents into a string decoding bytes using the 
platform's default character set. The length of the new <tt>String</tt>
+     * is a function of the character set, and hence may not be equal to the 
size of the buffer.
+     *
+     * <p>
+     * This method always replaces malformed-input and unmappable-character 
sequences with the default replacement string for the platform's default 
character set. The
+     * {@linkplain java.nio.charset.CharsetDecoder} class should be used when 
more control over the decoding process is required.
+     *
+     * @return String decoded from the buffer's contents.
+     * @since JDK1.1
+     */
+    @Override
+    public String toString() {
+        return new String(buf, 0, count);
+    }
+
+    /**
+     * Converts the buffer's contents into a string by decoding the bytes 
using the specified {@link java.nio.charset.Charset charsetName}. The length of 
the new <tt>String</tt> is a function of the
+     * charset, and hence may not be equal to the length of the byte array.
+     *
+     * <p>
+     * This method always replaces malformed-input and unmappable-character 
sequences with this charset's default replacement string. The {@link
+     * java.nio.charset.CharsetDecoder} class should be used when more control 
over the decoding process is required.
+     *
+     * @param charsetName the name of a supported {@linkplain 
java.nio.charset.Charset <code>charset</code>}
+     * @return String decoded from the buffer's contents.
+     * @exception UnsupportedEncodingException If the named charset is not 
supported
+     * @since JDK1.1
+     */
+    public String toString(String charsetName) throws 
UnsupportedEncodingException {
+        return new String(buf, 0, count, charsetName);
+    }
+
+    /**
+     * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in 
this class can be called after the stream has been closed without generating an 
<tt>IOException</tt>.
+     * <p>
+     *
+     */
+    @Override
+    public void close() {
+    }
+
+    public byte[] getUnderlyingBuffer() {
+        return buf;
+    }
+
+    public int getBufferLength() {
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
new file mode 100644
index 0000000..e9b8c9e
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ByteCountingInputStream extends InputStream {
+
+    private final InputStream in;
+    private long bytesRead = 0L;
+    private long bytesSkipped = 0L;
+
+    private long bytesSinceMark = 0L;
+
+    public ByteCountingInputStream(final InputStream in) {
+        this.in = in;
+    }
+
+    public ByteCountingInputStream(final InputStream in, final long 
initialOffset) {
+        this.in = in;
+        this.bytesSkipped = initialOffset;
+    }
+
+    @Override
+    public int read() throws IOException {
+        final int fromSuper = in.read();
+        if (fromSuper >= 0) {
+            bytesRead++;
+            bytesSinceMark++;
+        }
+        return fromSuper;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        final int fromSuper = in.read(b, off, len);
+        if (fromSuper >= 0) {
+            bytesRead += fromSuper;
+            bytesSinceMark += fromSuper;
+        }
+
+        return fromSuper;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(n);
+        if (skipped >= 0) {
+            bytesSkipped += skipped;
+            bytesSinceMark += skipped;
+        }
+        return skipped;
+    }
+
+    public long getBytesRead() {
+        return bytesRead;
+    }
+
+    public long getBytesSkipped() {
+        return bytesSkipped;
+    }
+
+    public long getBytesConsumed() {
+        return getBytesRead() + getBytesSkipped();
+    }
+
+    @Override
+    public void mark(final int readlimit) {
+        in.mark(readlimit);
+
+        bytesSinceMark = 0L;
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+        bytesRead -= bytesSinceMark;
+        bytesSinceMark = 0L;
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
new file mode 100644
index 0000000..9bbd45e
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ByteCountingOutputStream extends OutputStream {
+
+    private final OutputStream out;
+    private long bytesWritten = 0L;
+
+    public ByteCountingOutputStream(final OutputStream out) {
+        this.out = out;
+    }
+
+    public ByteCountingOutputStream(final OutputStream out, final long 
initialByteCount) {
+        this.out = out;
+        this.bytesWritten = initialByteCount;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+        bytesWritten++;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+        bytesWritten += len;
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.close();
+    }
+}

Reply via email to