http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
new file mode 100644
index 0000000..fc279fb
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * @author unattributed
+ */
+public final class ServerSocketConfiguration {
+
+    private boolean needClientAuth;
+    private Integer socketTimeout;
+    private Boolean reuseAddress;
+    private Integer receiveBufferSize;
+    private SSLContextFactory sslContextFactory;
+
+    public ServerSocketConfiguration() {
+    }
+
+    public SSLContext createSSLContext() throws KeyManagementException, 
NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, 
CertificateException, FileNotFoundException, IOException {
+        return sslContextFactory == null ? null : 
sslContextFactory.createSslContext();
+    }
+
+    public void setSSLContextFactory(final SSLContextFactory 
sslContextFactory) {
+        this.sslContextFactory = sslContextFactory;
+    }
+
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(Integer socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public boolean getNeedClientAuth() {
+        return needClientAuth;
+    }
+
+    public void setNeedClientAuth(boolean needClientAuth) {
+        this.needClientAuth = needClientAuth;
+    }
+
+    public Boolean getReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(Boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public Integer getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(Integer receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
new file mode 100644
index 0000000..c24b540
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * @author unattributed
+ */
+public final class SocketConfiguration {
+
+    private Integer socketTimeout;
+    private Integer receiveBufferSize;
+    private Integer sendBufferSize;
+    private Boolean reuseAddress;
+    private Boolean keepAlive;
+    private Boolean oobInline;
+    private Boolean tcpNoDelay;
+    private Integer trafficClass;
+    private SSLContextFactory sslContextFactory;
+
+    public SSLContext createSSLContext() throws KeyManagementException, 
NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, 
CertificateException, FileNotFoundException, IOException {
+        return sslContextFactory == null ? null : 
sslContextFactory.createSslContext();
+    }
+
+    public void setSSLContextFactory(final SSLContextFactory 
sslContextFactory) {
+        this.sslContextFactory = sslContextFactory;
+    }
+
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(Integer socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public Boolean getReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(Boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public Boolean getKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public Boolean getOobInline() {
+        return oobInline;
+    }
+
+    public void setOobInline(Boolean oobInline) {
+        this.oobInline = oobInline;
+    }
+
+    public Integer getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(Integer receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public Integer getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(Integer sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public Boolean getTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public Integer getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(Integer trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
new file mode 100644
index 0000000..e02791a
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.logging.NiFiLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for TCP/IP messages sent over unicast socket.
+ *
+ * @author unattributed
+ */
+public abstract class SocketListener {
+
+    private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(SocketListener.class));
+    private volatile ExecutorService executorService;  // volatile to 
guarantee most current value is visible
+    private volatile ServerSocket serverSocket;        // volatile to 
guarantee most current value is visible
+    private final int numThreads;
+    private final int port;
+    private final ServerSocketConfiguration configuration;
+    private final AtomicInteger shutdownListenerSeconds = new 
AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS);
+
+    public SocketListener(
+            final int numThreads,
+            final int port,
+            final ServerSocketConfiguration configuration) {
+
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Number of threads may not be 
less than or equal to zero.");
+        } else if (configuration == null) {
+            throw new IllegalArgumentException("Server socket configuration 
may not be null.");
+        }
+
+        this.numThreads = numThreads;
+        this.port = port;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Implements the action to perform when a new socket request is received.
+     * This class will close the socket.
+     *
+     * @param socket the socket
+     */
+    public abstract void dispatchRequest(final Socket socket);
+
+    public void start() throws IOException {
+
+        if (isRunning()) {
+            return;
+        }
+
+        try {
+            serverSocket = SocketUtils.createServerSocket(port, configuration);
+        } catch (KeyManagementException | UnrecoverableKeyException | 
NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
+            throw new IOException(e);
+        }
+
+        final ThreadFactory defaultThreadFactory = 
Executors.defaultThreadFactory();
+        executorService = Executors.newFixedThreadPool(numThreads, new 
ThreadFactory() {
+            private final AtomicLong threadCounter = new AtomicLong(0L);
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread newThread = defaultThreadFactory.newThread(r);
+                newThread.setName("Process NCM Request-" + 
threadCounter.incrementAndGet());
+                return newThread;
+            }
+        });
+
+        final ExecutorService runnableExecServiceRef = executorService;
+        final ServerSocket runnableServerSocketRef = serverSocket;
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (runnableExecServiceRef.isShutdown() == false) {
+                    Socket socket = null;
+                    try {
+                        try {
+                            socket = runnableServerSocketRef.accept();
+                            if (configuration.getSocketTimeout() != null) {
+                                
socket.setSoTimeout(configuration.getSocketTimeout());
+                            }
+                        } catch (final SocketTimeoutException ste) {
+                            // nobody connected to us. Go ahead and call 
closeQuietly just to make sure we don't leave
+                            // any sockets lingering
+                            SocketUtils.closeQuietly(socket);
+                            continue;
+                        } catch (final SocketException se) {
+                            logger.warn("Failed to communicate with " + 
(socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " 
due to " + se, se);
+                            SocketUtils.closeQuietly(socket);
+                            continue;
+                        } catch (final Throwable t) {
+                            logger.warn("Socket Listener encountered 
exception: " + t, t);
+                            SocketUtils.closeQuietly(socket);
+                            continue;
+                        }
+
+                        final Socket finalSocket = socket;
+                        runnableExecServiceRef.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    dispatchRequest(finalSocket);
+                                } catch (final Throwable t) {
+                                    logger.warn("Dispatching socket request 
encountered exception due to: " + t, t);
+                                } finally {
+                                    SocketUtils.closeQuietly(finalSocket);
+                                }
+                            }
+                        });
+                    } catch (final Throwable t) {
+                        logger.error("Socket Listener encountered exception: " 
+ t, t);
+                        SocketUtils.closeQuietly(socket);
+                    }
+                }
+            }
+        });
+        t.setName("Cluster Socket Listener");
+        t.start();
+    }
+
+    public boolean isRunning() {
+        return (executorService != null && executorService.isShutdown() == 
false);
+    }
+
+    public void stop() throws IOException {
+
+        if (isRunning() == false) {
+            return;
+        }
+
+        // shutdown executor service
+        try {
+            if (getShutdownListenerSeconds() <= 0) {
+                executorService.shutdownNow();
+            } else {
+                executorService.shutdown();
+            }
+            executorService.awaitTermination(getShutdownListenerSeconds(), 
TimeUnit.SECONDS);
+        } catch (final InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        } finally {
+            if (executorService.isTerminated()) {
+                logger.info("Socket Listener has been terminated 
successfully.");
+            } else {
+                logger.warn("Socket Listener has not terminated properly.  
There exists an uninterruptable thread that will take an indeterminate amount 
of time to stop.");
+            }
+        }
+
+        // shutdown server socket
+        SocketUtils.closeQuietly(serverSocket);
+
+    }
+
+    public int getShutdownListenerSeconds() {
+        return shutdownListenerSeconds.get();
+    }
+
+    public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
+        this.shutdownListenerSeconds.set(shutdownListenerSeconds);
+    }
+
+    public ServerSocketConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public int getPort() {
+        if (isRunning()) {
+            return serverSocket.getLocalPort();
+        } else {
+            return port;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
new file mode 100644
index 0000000..fb6a00c
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLSocket;
+
+import org.apache.nifi.logging.NiFiLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author unattributed
+ */
+public final class SocketUtils {
+
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(SocketUtils.class));
+
+    public static Socket createSocket(final InetSocketAddress address, final 
SocketConfiguration config) throws IOException {
+        if (address == null) {
+            throw new IllegalArgumentException("Socket address may not be 
null.");
+        } else if (config == null) {
+            throw new IllegalArgumentException("Configuration may not be 
null.");
+        }
+
+        final Socket socket;
+
+        final SSLContext sslContext;
+        try {
+            sslContext = config.createSSLContext();
+        } catch (final Exception e) {
+            throw new IOException("Could not create SSLContext", e);
+        }
+
+        if (sslContext == null) {
+            socket = new Socket(address.getHostName(), address.getPort());
+        } else {
+            socket = 
sslContext.getSocketFactory().createSocket(address.getHostName(), 
address.getPort());
+        }
+
+        if (config.getSocketTimeout() != null) {
+            socket.setSoTimeout(config.getSocketTimeout());
+        }
+
+        if (config.getReuseAddress() != null) {
+            socket.setReuseAddress(config.getReuseAddress());
+        }
+
+        if (config.getReceiveBufferSize() != null) {
+            socket.setReceiveBufferSize(config.getReceiveBufferSize());
+        }
+
+        if (config.getSendBufferSize() != null) {
+            socket.setSendBufferSize(config.getSendBufferSize());
+        }
+
+        if (config.getTrafficClass() != null) {
+            socket.setTrafficClass(config.getTrafficClass());
+        }
+
+        if (config.getKeepAlive() != null) {
+            socket.setKeepAlive(config.getKeepAlive());
+        }
+
+        if (config.getOobInline() != null) {
+            socket.setOOBInline(config.getOobInline());
+        }
+
+        if (config.getTcpNoDelay() != null) {
+            socket.setTcpNoDelay(config.getTcpNoDelay());
+        }
+
+        return socket;
+    }
+
+    public static ServerSocket createServerSocket(final int port, final 
ServerSocketConfiguration config) throws IOException, KeyManagementException, 
UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, 
CertificateException {
+        if (config == null) {
+            throw new NullPointerException("Configuration may not be null.");
+        }
+
+        final SSLContext sslContext = config.createSSLContext();
+        final ServerSocket serverSocket;
+        if (sslContext == null) {
+            serverSocket = new ServerSocket(port);
+        } else {
+            serverSocket = 
sslContext.getServerSocketFactory().createServerSocket(port);
+            ((SSLServerSocket) 
serverSocket).setNeedClientAuth(config.getNeedClientAuth());
+        }
+
+        if (config.getSocketTimeout() != null) {
+            serverSocket.setSoTimeout(config.getSocketTimeout());
+        }
+
+        if (config.getReuseAddress() != null) {
+            serverSocket.setReuseAddress(config.getReuseAddress());
+        }
+
+        if (config.getReceiveBufferSize() != null) {
+            serverSocket.setReceiveBufferSize(config.getReceiveBufferSize());
+        }
+
+        return serverSocket;
+    }
+
+    public static void closeQuietly(final Socket socket) {
+        if (socket == null) {
+            return;
+        }
+
+        try {
+            try {
+                // can't shudown input/output individually with secure sockets
+                if ((socket instanceof SSLSocket) == false) {
+                    if (socket.isInputShutdown() == false) {
+                        socket.shutdownInput();
+                    }
+                    if (socket.isOutputShutdown() == false) {
+                        socket.shutdownOutput();
+                    }
+                }
+            } finally {
+                if (socket.isClosed() == false) {
+                    socket.close();
+                }
+            }
+        } catch (final Exception ex) {
+            logger.debug("Failed to close socket due to: " + ex, ex);
+        }
+    }
+
+    public static void closeQuietly(final ServerSocket serverSocket) {
+        if (serverSocket == null) {
+            return;
+        }
+
+        try {
+            serverSocket.close();
+        } catch (final Exception ex) {
+            logger.debug("Failed to close server socket due to: " + ex, ex);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
new file mode 100644
index 0000000..7a62813
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A service that may be discovered at runtime. A service is defined as having 
a
+ * unique case-sensitive service name and a socket address where it is
+ * available.
+ *
+ * @author unattributed
+ */
+public interface DiscoverableService {
+
+    /**
+     * The service's name. Two services are considered equal if they have the
+     * same case sensitive service name.
+     *
+     * @return the service's name
+     */
+    String getServiceName();
+
+    /**
+     * @return the service's address
+     */
+    InetSocketAddress getServiceAddress();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
new file mode 100644
index 0000000..5f378b9
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * A basic implementation of the DiscoverableService interface. To services are
+ * considered equal if they have the same case-sensitive service name.
+ *
+ * @author unattributed
+ */
+public class DiscoverableServiceImpl implements DiscoverableService {
+
+    private final String serviceName;
+
+    private final InetSocketAddress serviceAddress;
+
+    public DiscoverableServiceImpl(final String serviceName, final 
InetSocketAddress serviceAddress) {
+        if (StringUtils.isBlank(serviceName)) {
+            throw new IllegalArgumentException("Service name may not be null 
or empty.");
+        } else if (serviceAddress == null) {
+            throw new IllegalArgumentException("Service address may not be 
null.");
+        }
+        this.serviceName = serviceName;
+        this.serviceAddress = serviceAddress;
+    }
+
+    @Override
+    public InetSocketAddress getServiceAddress() {
+        return serviceAddress;
+    }
+
+    @Override
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("[Discoverable Service: %s available at %s:%d]", 
serviceName, serviceAddress.getHostName(), serviceAddress.getPort());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof DiscoverableService)) {
+            return false;
+        }
+        final DiscoverableService other = (DiscoverableService) obj;
+        return !((this.serviceName == null) ? (other.getServiceName() != null) 
: !this.serviceName.equals(other.getServiceName()));
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 5;
+        hash = 53 * hash + (this.serviceName != null ? 
this.serviceName.hashCode() : 0);
+        return hash;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
new file mode 100644
index 0000000..ea0b72a
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+/**
+ * @author unattributed
+ */
+public final class MulticastConfiguration {
+
+    private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL;
+
+    private Integer socketTimeout;
+
+    private Integer receiveBufferSize;
+
+    private Integer sendBufferSize;
+
+    private Boolean reuseAddress;
+
+    private Integer trafficClass;
+
+    private Boolean loopbackMode;
+
+    public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = 
MulticastTimeToLive.SAME_SUBNET;
+
+    public MulticastTimeToLive getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(final MulticastTimeToLive ttl) {
+        if (ttl == null) {
+            throw new NullPointerException("Multicast TTL may not be null.");
+        }
+        this.ttl = ttl;
+    }
+
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(Integer socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public Boolean getReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(Boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public Integer getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(Integer receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public Integer getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(Integer sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public Integer getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(Integer trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+    public Boolean getLoopbackMode() {
+        return loopbackMode;
+    }
+
+    public void setLoopbackMode(Boolean loopbackMode) {
+        this.loopbackMode = loopbackMode;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
new file mode 100644
index 0000000..e562c25
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for protocol messages sent over multicast. If a 
message
+ * is of type MulticastProtocolMessage, then the underlying protocol message is
+ * passed to the handler. If the receiving handler produces a message response,
+ * then the message is wrapped with a MulticastProtocolMessage before being 
sent
+ * to the originator.
+ *
+ * @author unattributed
+ */
+public abstract class MulticastListener {
+
+    // constants
+    private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
+    private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512;
+
+    private static final Logger logger = new 
org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class));
+
+    // immutable members
+    private final int numThreads;
+    private final InetSocketAddress multicastAddress;
+    private final MulticastConfiguration configuration;
+
+    private volatile ExecutorService executorService;     // volatile to 
guarantee most current value is visible
+    private volatile MulticastSocket multicastSocket;     // volatile to 
guarantee most current value is visible
+
+    private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS;
+    private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES;
+
+    public MulticastListener(
+            final int numThreads,
+            final InetSocketAddress multicastAddress,
+            final MulticastConfiguration configuration) {
+
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Number of threads may not be 
less than or equal to zero.");
+        } else if (multicastAddress == null) {
+            throw new IllegalArgumentException("Multicast address may not be 
null.");
+        } else if (multicastAddress.getAddress().isMulticastAddress() == 
false) {
+            throw new IllegalArgumentException("Multicast group must be a 
Class D address.");
+        } else if (configuration == null) {
+            throw new IllegalArgumentException("Multicast configuration may 
not be null.");
+        }
+
+        this.numThreads = numThreads;
+        this.multicastAddress = multicastAddress;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Implements the action to perform when a new datagram is received. This
+     * class must not close the multicast socket.
+     *
+     * @param multicastSocket
+     * @param packet the datagram socket
+     */
+    public abstract void dispatchRequest(final MulticastSocket 
multicastSocket, final DatagramPacket packet);
+
+    public void start() throws IOException {
+
+        if (isRunning()) {
+            return;
+        }
+
+        multicastSocket = 
MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration);
+        multicastSocket.joinGroup(multicastAddress.getAddress());
+
+        executorService = Executors.newFixedThreadPool(numThreads);
+
+        final ExecutorService runnableExecServiceRef = executorService;
+        final MulticastSocket runnableMulticastSocketRef = multicastSocket;
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (runnableExecServiceRef.isShutdown() == false) {
+                    try {
+                        final byte[] buf = new byte[maxPacketSizeBytes];
+                        final DatagramPacket packet = new DatagramPacket(buf, 
maxPacketSizeBytes);
+                        runnableMulticastSocketRef.receive(packet);
+                        runnableExecServiceRef.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                dispatchRequest(multicastSocket, packet);
+                            }
+                        });
+                    } catch (final SocketException | SocketTimeoutException 
ste) {
+                        /* ignore so that we can accept connections in 
approximately a non-blocking fashion */
+                    } catch (final Exception e) {
+                        logger.warn("Cluster protocol receiver encountered 
exception: " + e, e);
+                    }
+                }
+            }
+        }).start();
+    }
+
+    public boolean isRunning() {
+        return (executorService != null && executorService.isShutdown() == 
false);
+    }
+
+    public void stop() throws IOException {
+
+        if (isRunning() == false) {
+            return;
+        }
+
+        // shutdown executor service
+        try {
+            if (getShutdownListenerSeconds() <= 0) {
+                executorService.shutdownNow();
+            } else {
+                executorService.shutdown();
+            }
+            executorService.awaitTermination(getShutdownListenerSeconds(), 
TimeUnit.SECONDS);
+        } catch (final InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        } finally {
+            if (executorService.isTerminated()) {
+                logger.info("Multicast Listener has been terminated 
successfully.");
+            } else {
+                logger.warn("Multicast Listener has not terminated properly.  
There exists an uninterruptable thread that will take an indeterminate amount 
of time to stop.");
+            }
+        }
+
+        // shutdown server socket
+        if (multicastSocket.isClosed() == false) {
+            multicastSocket.leaveGroup(multicastAddress.getAddress());
+            multicastSocket.close();
+        }
+
+    }
+
+    public int getShutdownListenerSeconds() {
+        return shutdownListenerSeconds;
+    }
+
+    public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
+        this.shutdownListenerSeconds = shutdownListenerSeconds;
+    }
+
+    public int getMaxPacketSizeBytes() {
+        return maxPacketSizeBytes;
+    }
+
+    public void setMaxPacketSizeBytes(int maxPacketSizeBytes) {
+        if (maxPacketSizeBytes <= 0) {
+            throw new IllegalArgumentException("Max packet size must be 
greater than zero bytes.");
+        }
+        this.maxPacketSizeBytes = maxPacketSizeBytes;
+    }
+
+    public MulticastConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public InetSocketAddress getMulticastAddress() {
+        return multicastAddress;
+    }
+
+    public int getNumThreads() {
+        return numThreads;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
new file mode 100644
index 0000000..c254c11
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Defines the interface for discovering services based on name. Services are
+ * expected to be exposed via socket address and port.
+ *
+ * @author unattributed
+ */
+public interface MulticastServiceDiscovery extends ServiceDiscovery {
+
+    /**
+     * @return the multicast address
+     */
+    InetSocketAddress getMulticastAddress();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
new file mode 100644
index 0000000..a3cff9b
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Defines the interface for broadcasting a service via multicast.
+ *
+ * @author unattributed
+ */
+public interface MulticastServicesBroadcaster extends ServicesBroadcaster {
+
+    /**
+     * @return the multicast address
+     */
+    InetSocketAddress getMulticastAddress();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
new file mode 100644
index 0000000..dad1173
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+/**
+ * @author unattributed
+ */
+public enum MulticastTimeToLive {
+
+    SAME_HOST(0),
+    SAME_SUBNET(1),
+    SAME_SITE(32),
+    SAME_REGION(64),
+    SAME_CONTINENT(128),
+    UNRESTRICTED(255);
+
+    private final int ttl;
+
+    MulticastTimeToLive(final int ttl) {
+        this.ttl = ttl;
+    }
+
+    public int getTtl() {
+        return ttl;
+    }
+
+    public MulticastTimeToLive valueOfByTtl(final int ttl) {
+        for (final MulticastTimeToLive value : values()) {
+            if (value.getTtl() == ttl) {
+                return value;
+            }
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
new file mode 100644
index 0000000..8a8b7c0
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author unattributed
+ */
+public final class MulticastUtils {
+
+    private static final Logger logger = new 
org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class));
+
+    public static MulticastSocket createMulticastSocket(final 
MulticastConfiguration config) throws IOException {
+        return createMulticastSocket(0, config);
+    }
+
+    public static MulticastSocket createMulticastSocket(final int port, final 
MulticastConfiguration config) throws IOException {
+        if (config == null) {
+            throw new IllegalArgumentException("Configuration may not be 
null.");
+        }
+
+        final MulticastSocket socket;
+        if (port <= 0) {
+            socket = new MulticastSocket();
+        } else {
+            socket = new MulticastSocket(port);
+        }
+        socket.setTimeToLive(config.getTtl().getTtl());
+
+        if (config.getSocketTimeout() != null) {
+            socket.setSoTimeout(config.getSocketTimeout());
+        }
+
+        if (config.getReuseAddress() != null) {
+            socket.setReuseAddress(config.getReuseAddress());
+        }
+
+        if (config.getReceiveBufferSize() != null) {
+            socket.setReceiveBufferSize(config.getReceiveBufferSize());
+        }
+
+        if (config.getSendBufferSize() != null) {
+            socket.setSendBufferSize(config.getSendBufferSize());
+        }
+
+        if (config.getTrafficClass() != null) {
+            socket.setTrafficClass(config.getTrafficClass());
+        }
+
+        if (config.getLoopbackMode() != null) {
+            socket.setLoopbackMode(config.getLoopbackMode());
+        }
+
+        return socket;
+    }
+
+    public static void closeQuietly(final MulticastSocket socket) {
+
+        if (socket == null) {
+            return;
+        }
+
+        try {
+            socket.close();
+        } catch (final Exception ex) {
+            logger.debug("Failed to close multicast socket due to: " + ex, ex);
+        }
+
+    }
+
+    public static void closeQuietly(final MulticastSocket socket, final 
InetAddress groupAddress) {
+
+        if (socket == null) {
+            return;
+        }
+
+        try {
+            socket.leaveGroup(groupAddress);
+        } catch (final Exception ex) {
+            logger.debug("Failed to leave multicast group due to: " + ex, ex);
+        }
+
+        try {
+            socket.close();
+        } catch (final Exception ex) {
+            logger.debug("Failed to close multicast socket due to: " + ex, ex);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
new file mode 100644
index 0000000..173146e
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+/**
+ * Defines a generic interface for discovering services.
+ *
+ * @author unattributed
+ */
+public interface ServiceDiscovery {
+
+    /**
+     * @return the discovered service
+     */
+    DiscoverableService getService();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
new file mode 100644
index 0000000..86260d8
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket.multicast;
+
+import java.util.Set;
+
+/**
+ * Defines the interface for broadcasting a collection of services for client
+ * discovery.
+ *
+ * @author unattributed
+ */
+public interface ServicesBroadcaster {
+
+    /**
+     * @return the delay in milliseconds to wait between successive broadcasts
+     */
+    int getBroadcastDelayMs();
+
+    /**
+     * @return the broadcasted services
+     */
+    Set<DiscoverableService> getServices();
+
+    /**
+     * Adds the given service to the set of broadcasted services.
+     *
+     * @param service a service
+     * @return true if the service was added to the set; false a service with
+     * the given service name already exists in the set.
+     */
+    boolean addService(DiscoverableService service);
+
+    /**
+     * Removes the service with the given service name from the set.
+     *
+     * @param serviceName a service name
+     * @return true if the service was removed; false otherwise
+     */
+    boolean removeService(String serviceName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
new file mode 100644
index 0000000..b5240c9
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.io.nio.ChannelListener;
+import org.apache.nifi.io.nio.consumer.StreamConsumer;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class ServerMain {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerMain.class);
+
+    public static void main(final String[] args) throws IOException {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+
+        final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
+        final Map<StreamConsumer, ScheduledFuture<?>> consumerMap = new 
ConcurrentHashMap<>();
+        final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0);
+        ChannelListener listener = null;
+        try {
+            executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, 
TimeUnit.SECONDS);
+            listener = new ChannelListener(5, new 
ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, 
TimeUnit.MILLISECONDS);
+            listener.setChannelReaderSchedulingPeriod(50L, 
TimeUnit.MILLISECONDS);
+            listener.addDatagramChannel(null, 20000, 32 << 20);
+            LOGGER.info("Listening for UDP data on port 20000");
+            listener.addServerSocket(null, 20001, 64 << 20);
+            LOGGER.info("listening for TCP connections on port 20001");
+            listener.addServerSocket(null, 20002, 64 << 20);
+            LOGGER.info("listening for TCP connections on port 20002");
+            final Calendar endTime = Calendar.getInstance();
+            endTime.add(Calendar.MINUTE, 30);
+            while (true) {
+                processAllConsumers(consumerMap);
+                if (endTime.before(Calendar.getInstance())) {
+                    break; // time to shut down
+                }
+            }
+        } finally {
+            if (listener != null) {
+                LOGGER.info("Shutting down server....");
+                listener.shutdown(1L, TimeUnit.SECONDS);
+                LOGGER.info("Consumer map size = " + consumerMap.size());
+                while (consumerMap.size() > 0) {
+                    processAllConsumers(consumerMap);
+                }
+                LOGGER.info("Consumer map size = " + consumerMap.size());
+            }
+            executor.shutdown();
+        }
+    }
+
+    private static void processAllConsumers(final Map<StreamConsumer, 
ScheduledFuture<?>> consumerMap) {
+        final Set<StreamConsumer> deadConsumers = new HashSet<>();
+        for (final Map.Entry<StreamConsumer, ScheduledFuture<?>> entry : 
consumerMap.entrySet()) {
+            if (entry.getKey().isConsumerFinished()) {
+                entry.getValue().cancel(true);
+                deadConsumers.add(entry.getKey());
+            }
+        }
+        for (final StreamConsumer consumer : deadConsumers) {
+            LOGGER.debug("removing consumer " + consumer);
+            consumerMap.remove(consumer);
+        }
+    }
+
+    public static final class ConsumerRunner implements Runnable {
+
+        private final StreamConsumer consumer;
+
+        public ConsumerRunner(final StreamConsumer consumer) {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void run() {
+            if (consumer.isConsumerFinished()) {
+                return;
+            }
+            try {
+                consumer.process();
+            } catch (IOException ex) {
+                LOGGER.error("", ex);
+            }
+        }
+    }
+
+    public static final class ExampleStreamConsumerFactory implements 
StreamConsumerFactory {
+
+        final ScheduledExecutorService executor;
+        final Map<StreamConsumer, ScheduledFuture<?>> consumerMap;
+
+        public ExampleStreamConsumerFactory(final ScheduledExecutorService 
executor, final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) {
+            this.executor = executor;
+            this.consumerMap = consumerMap;
+        }
+
+        @Override
+        public StreamConsumer newInstance(final String streamId) {
+            final StreamConsumer consumer = new 
UselessStreamConsumer(streamId);
+            final ScheduledFuture<?> future = 
executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, 
TimeUnit.MILLISECONDS);
+            consumerMap.put(consumer, future);
+            LOGGER.info("Added consumer: " + consumer);
+            return consumer;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
new file mode 100644
index 0000000..b3d214e
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author none
+ */
+public class TCPClient {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TCPClient.class);
+
+    public static void main(final String[] args) throws Exception {
+        final byte[] bytes = TCPClient.makeBytes();
+        Thread first = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < 10; i++) {
+                        sendData(20001, bytes);
+                    }
+                } catch (Exception e) {
+                    logger.error("Blew exception", e);
+                }
+            }
+        });
+        Thread second = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    for (int i = 0; i < 10; i++) {
+                        sendData(20002, bytes);
+                    }
+                } catch (Exception e) {
+                    logger.error("Blew exception", e);
+                }
+            }
+        });
+        first.start();
+        second.start();
+    }
+
+    public static byte[] makeBytes() {
+        byte[] bytes = new byte[2 << 20];
+        return bytes;
+    }
+
+    private static void sendData(final int port, final byte[] bytes) throws 
SocketException, IOException, InterruptedException {
+        long totalBytes;
+        try (Socket sock = new Socket("localhost", port)) {
+            sock.setTcpNoDelay(true);
+            sock.setSoTimeout(2000);
+            totalBytes = 0L;
+            logger.info("socket established " + sock + " to port " + port + " 
now waiting 5 seconds to send anything...");
+            Thread.sleep(5000L);
+            for (int i = 0; i < 1000; i++) {
+                sock.getOutputStream().write(bytes);
+                totalBytes += bytes.length;
+            }   sock.getOutputStream().flush();
+        }
+        logger.info("Total bytes sent: " + totalBytes + " to port " + port);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
new file mode 100644
index 0000000..90f4c42
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author none
+ */
+public class UDPClient {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(UDPClient.class);
+
+    public static void main(final String[] args) throws Exception {
+        final byte[] buffer = UDPClient.makeBytes();
+        final DatagramPacket packet = new DatagramPacket(buffer, 
buffer.length, new InetSocketAddress("localhost", 20000));
+        final DatagramSocket socket = new DatagramSocket();
+        final long startTime = System.nanoTime();
+        for (int i = 0; i < 819200; i++) { // 100 MB
+            socket.send(packet);
+        }
+        final long endTime = System.nanoTime();
+        final long durationMillis = (endTime - startTime) / 1000000;
+        LOGGER.info("Sent all UDP packets without any obvious errors | 
duration ms= " + durationMillis);
+    }
+
+    public static byte[] makeBytes() {
+        byte[] bytes = new byte[128];
+        return bytes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
new file mode 100644
index 0000000..9ec26e9
--- /dev/null
+++ 
b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.example;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer;
+
+/**
+ *
+ * @author none
+ */
+public class UselessStreamConsumer extends AbstractStreamConsumer {
+
+    public UselessStreamConsumer(final String id) {
+        super(id);
+    }
+
+    @Override
+    protected void processBuffer(final ByteBuffer buffer) throws IOException {
+    }
+
+    @Override
+    protected void onConsumerDone() {
+        System.err.println("IN consumer done");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/test/resources/log4j.xml 
b/commons/nifi-socket-utils/src/test/resources/log4j.xml
new file mode 100644
index 0000000..8e93769
--- /dev/null
+++ b/commons/nifi-socket-utils/src/test/resources/log4j.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+
+    <!-- Appender for printing formatted log statements to the console. -->
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p [%t] %40.40c - 
%m%n"/>
+        </layout>
+    </appender>
+
+    <!-- Logger for managing logging statements for nifi -->
+    <logger name="nifi">
+        <level value="debug"/>
+    </logger>
+
+    <root>
+        <level value="warn"/>
+        <appender-ref ref="console"/>
+    </root>
+</log4j:configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/.gitignore
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/.gitignore 
b/commons/nifi-stream-utils/.gitignore
new file mode 100755
index 0000000..ea8c4bf
--- /dev/null
+++ b/commons/nifi-stream-utils/.gitignore
@@ -0,0 +1 @@
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-stream-utils/pom.xml 
b/commons/nifi-stream-utils/pom.xml
new file mode 100644
index 0000000..0413575
--- /dev/null
+++ b/commons/nifi-stream-utils/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-stream-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>NiFi Stream Utils</name>
+    
+    <build>
+        <plugins>
+            <!-- Enforce 1.6 compliance -->
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.6</source>
+                    <target>1.6</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java
 
b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java
new file mode 100644
index 0000000..57adb8c
--- /dev/null
+++ 
b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.InputStream;
+
+/**
+ * This class is a slight modification of the BufferedInputStream in the 
java.io
+ * package. The modification is that this implementation does not provide
+ * synchronization on method calls, which means that this class is not suitable
+ * for use by multiple threads. However, the absence of these synchronized
+ * blocks results in potentially much better performance.
+ */
+public class BufferedInputStream extends java.io.BufferedInputStream {
+
+    public BufferedInputStream(final InputStream in) {
+        super(in);
+    }
+
+    public BufferedInputStream(final InputStream in, final int size) {
+        super(in, size);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java
 
b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java
new file mode 100644
index 0000000..56caf65
--- /dev/null
+++ 
b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * This class is a slight modification of the
+ * {@link java.io.BufferedOutputStream} class. This implementation differs in
+ * that it does not mark methods as synchronized. This means that this class is
+ * not suitable for writing by multiple concurrent threads. However, the 
removal
+ * of the synchronized keyword results in potentially much better performance.
+ */
+public class BufferedOutputStream extends FilterOutputStream {
+
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
+     */
+    protected int count;
+
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream.
+     *
+     * @param out the underlying output stream.
+     */
+    public BufferedOutputStream(OutputStream out) {
+        this(out, 8192);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     *
+     * @param out the underlying output stream.
+     * @param size the buffer size.
+     * @exception IllegalArgumentException if size &lt;= 0.
+     */
+    public BufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Flush the internal buffer
+     */
+    private void flushBuffer() throws IOException {
+        if (count > 0) {
+            out.write(buf, 0, count);
+            count = 0;
+        }
+    }
+
+    /**
+     * Writes the specified byte to this buffered output stream.
+     *
+     * @param b the byte to be written.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void write(int b) throws IOException {
+        if (count >= buf.length) {
+            flushBuffer();
+        }
+        buf[count++] = (byte) b;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this buffered output stream.
+     *
+     * <p>
+     * Ordinarily this method stores bytes from the given array into this
+     * stream's buffer, flushing the buffer to the underlying output stream as
+     * needed. If the requested length is at least as large as this stream's
+     * buffer, however, then this method will flush the buffer and write the
+     * bytes directly to the underlying output stream. Thus redundant
+     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if (len >= buf.length) {
+            /* If the request length exceeds the size of the output buffer,
+             flush the output buffer and then write the data directly.
+             In this way buffered streams will cascade harmlessly. */
+            flushBuffer();
+            out.write(b, off, len);
+            return;
+        }
+        if (len >= buf.length - count) {
+            flushBuffer();
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    /**
+     * Flushes this buffered output stream. This forces any buffered output
+     * bytes to be written out to the underlying output stream.
+     *
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+        out.flush();
+    }
+}

Reply via email to