http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java new file mode 100644 index 0000000..fc279fb --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLContext; + +/** + * @author unattributed + */ +public final class ServerSocketConfiguration { + + private boolean needClientAuth; + private Integer socketTimeout; + private Boolean reuseAddress; + private Integer receiveBufferSize; + private SSLContextFactory sslContextFactory; + + public ServerSocketConfiguration() { + } + + public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { + return sslContextFactory == null ? null : sslContextFactory.createSslContext(); + } + + public void setSSLContextFactory(final SSLContextFactory sslContextFactory) { + this.sslContextFactory = sslContextFactory; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(Integer socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public boolean getNeedClientAuth() { + return needClientAuth; + } + + public void setNeedClientAuth(boolean needClientAuth) { + this.needClientAuth = needClientAuth; + } + + public Boolean getReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(Boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public Integer getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(Integer receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java new file mode 100644 index 0000000..c24b540 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLContext; + +/** + * @author unattributed + */ +public final class SocketConfiguration { + + private Integer socketTimeout; + private Integer receiveBufferSize; + private Integer sendBufferSize; + private Boolean reuseAddress; + private Boolean keepAlive; + private Boolean oobInline; + private Boolean tcpNoDelay; + private Integer trafficClass; + private SSLContextFactory sslContextFactory; + + public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { + return sslContextFactory == null ? null : sslContextFactory.createSslContext(); + } + + public void setSSLContextFactory(final SSLContextFactory sslContextFactory) { + this.sslContextFactory = sslContextFactory; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(Integer socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public Boolean getReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(Boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public Boolean getKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(Boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public Boolean getOobInline() { + return oobInline; + } + + public void setOobInline(Boolean oobInline) { + this.oobInline = oobInline; + } + + public Integer getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(Integer receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public Integer getSendBufferSize() { + return sendBufferSize; + } + + public void setSendBufferSize(Integer sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + + public Boolean getTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(Boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public Integer getTrafficClass() { + return trafficClass; + } + + public void setTrafficClass(Integer trafficClass) { + this.trafficClass = trafficClass; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java new file mode 100644 index 0000000..e02791a --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.logging.NiFiLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a listener for TCP/IP messages sent over unicast socket. + * + * @author unattributed + */ +public abstract class SocketListener { + + private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5; + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketListener.class)); + private volatile ExecutorService executorService; // volatile to guarantee most current value is visible + private volatile ServerSocket serverSocket; // volatile to guarantee most current value is visible + private final int numThreads; + private final int port; + private final ServerSocketConfiguration configuration; + private final AtomicInteger shutdownListenerSeconds = new AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS); + + public SocketListener( + final int numThreads, + final int port, + final ServerSocketConfiguration configuration) { + + if (numThreads <= 0) { + throw new IllegalArgumentException("Number of threads may not be less than or equal to zero."); + } else if (configuration == null) { + throw new IllegalArgumentException("Server socket configuration may not be null."); + } + + this.numThreads = numThreads; + this.port = port; + this.configuration = configuration; + } + + /** + * Implements the action to perform when a new socket request is received. + * This class will close the socket. + * + * @param socket the socket + */ + public abstract void dispatchRequest(final Socket socket); + + public void start() throws IOException { + + if (isRunning()) { + return; + } + + try { + serverSocket = SocketUtils.createServerSocket(port, configuration); + } catch (KeyManagementException | UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) { + throw new IOException(e); + } + + final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); + executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() { + private final AtomicLong threadCounter = new AtomicLong(0L); + + @Override + public Thread newThread(final Runnable r) { + final Thread newThread = defaultThreadFactory.newThread(r); + newThread.setName("Process NCM Request-" + threadCounter.incrementAndGet()); + return newThread; + } + }); + + final ExecutorService runnableExecServiceRef = executorService; + final ServerSocket runnableServerSocketRef = serverSocket; + + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + while (runnableExecServiceRef.isShutdown() == false) { + Socket socket = null; + try { + try { + socket = runnableServerSocketRef.accept(); + if (configuration.getSocketTimeout() != null) { + socket.setSoTimeout(configuration.getSocketTimeout()); + } + } catch (final SocketTimeoutException ste) { + // nobody connected to us. Go ahead and call closeQuietly just to make sure we don't leave + // any sockets lingering + SocketUtils.closeQuietly(socket); + continue; + } catch (final SocketException se) { + logger.warn("Failed to communicate with " + (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " due to " + se, se); + SocketUtils.closeQuietly(socket); + continue; + } catch (final Throwable t) { + logger.warn("Socket Listener encountered exception: " + t, t); + SocketUtils.closeQuietly(socket); + continue; + } + + final Socket finalSocket = socket; + runnableExecServiceRef.execute(new Runnable() { + @Override + public void run() { + try { + dispatchRequest(finalSocket); + } catch (final Throwable t) { + logger.warn("Dispatching socket request encountered exception due to: " + t, t); + } finally { + SocketUtils.closeQuietly(finalSocket); + } + } + }); + } catch (final Throwable t) { + logger.error("Socket Listener encountered exception: " + t, t); + SocketUtils.closeQuietly(socket); + } + } + } + }); + t.setName("Cluster Socket Listener"); + t.start(); + } + + public boolean isRunning() { + return (executorService != null && executorService.isShutdown() == false); + } + + public void stop() throws IOException { + + if (isRunning() == false) { + return; + } + + // shutdown executor service + try { + if (getShutdownListenerSeconds() <= 0) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + if (executorService.isTerminated()) { + logger.info("Socket Listener has been terminated successfully."); + } else { + logger.warn("Socket Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); + } + } + + // shutdown server socket + SocketUtils.closeQuietly(serverSocket); + + } + + public int getShutdownListenerSeconds() { + return shutdownListenerSeconds.get(); + } + + public void setShutdownListenerSeconds(final int shutdownListenerSeconds) { + this.shutdownListenerSeconds.set(shutdownListenerSeconds); + } + + public ServerSocketConfiguration getConfiguration() { + return configuration; + } + + public int getPort() { + if (isRunning()) { + return serverSocket.getLocalPort(); + } else { + return port; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java new file mode 100644 index 0000000..fb6a00c --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; + +import org.apache.nifi.logging.NiFiLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author unattributed + */ +public final class SocketUtils { + + private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class)); + + public static Socket createSocket(final InetSocketAddress address, final SocketConfiguration config) throws IOException { + if (address == null) { + throw new IllegalArgumentException("Socket address may not be null."); + } else if (config == null) { + throw new IllegalArgumentException("Configuration may not be null."); + } + + final Socket socket; + + final SSLContext sslContext; + try { + sslContext = config.createSSLContext(); + } catch (final Exception e) { + throw new IOException("Could not create SSLContext", e); + } + + if (sslContext == null) { + socket = new Socket(address.getHostName(), address.getPort()); + } else { + socket = sslContext.getSocketFactory().createSocket(address.getHostName(), address.getPort()); + } + + if (config.getSocketTimeout() != null) { + socket.setSoTimeout(config.getSocketTimeout()); + } + + if (config.getReuseAddress() != null) { + socket.setReuseAddress(config.getReuseAddress()); + } + + if (config.getReceiveBufferSize() != null) { + socket.setReceiveBufferSize(config.getReceiveBufferSize()); + } + + if (config.getSendBufferSize() != null) { + socket.setSendBufferSize(config.getSendBufferSize()); + } + + if (config.getTrafficClass() != null) { + socket.setTrafficClass(config.getTrafficClass()); + } + + if (config.getKeepAlive() != null) { + socket.setKeepAlive(config.getKeepAlive()); + } + + if (config.getOobInline() != null) { + socket.setOOBInline(config.getOobInline()); + } + + if (config.getTcpNoDelay() != null) { + socket.setTcpNoDelay(config.getTcpNoDelay()); + } + + return socket; + } + + public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException { + if (config == null) { + throw new NullPointerException("Configuration may not be null."); + } + + final SSLContext sslContext = config.createSSLContext(); + final ServerSocket serverSocket; + if (sslContext == null) { + serverSocket = new ServerSocket(port); + } else { + serverSocket = sslContext.getServerSocketFactory().createServerSocket(port); + ((SSLServerSocket) serverSocket).setNeedClientAuth(config.getNeedClientAuth()); + } + + if (config.getSocketTimeout() != null) { + serverSocket.setSoTimeout(config.getSocketTimeout()); + } + + if (config.getReuseAddress() != null) { + serverSocket.setReuseAddress(config.getReuseAddress()); + } + + if (config.getReceiveBufferSize() != null) { + serverSocket.setReceiveBufferSize(config.getReceiveBufferSize()); + } + + return serverSocket; + } + + public static void closeQuietly(final Socket socket) { + if (socket == null) { + return; + } + + try { + try { + // can't shudown input/output individually with secure sockets + if ((socket instanceof SSLSocket) == false) { + if (socket.isInputShutdown() == false) { + socket.shutdownInput(); + } + if (socket.isOutputShutdown() == false) { + socket.shutdownOutput(); + } + } + } finally { + if (socket.isClosed() == false) { + socket.close(); + } + } + } catch (final Exception ex) { + logger.debug("Failed to close socket due to: " + ex, ex); + } + } + + public static void closeQuietly(final ServerSocket serverSocket) { + if (serverSocket == null) { + return; + } + + try { + serverSocket.close(); + } catch (final Exception ex) { + logger.debug("Failed to close server socket due to: " + ex, ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java new file mode 100644 index 0000000..7a62813 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; + +/** + * A service that may be discovered at runtime. A service is defined as having a + * unique case-sensitive service name and a socket address where it is + * available. + * + * @author unattributed + */ +public interface DiscoverableService { + + /** + * The service's name. Two services are considered equal if they have the + * same case sensitive service name. + * + * @return the service's name + */ + String getServiceName(); + + /** + * @return the service's address + */ + InetSocketAddress getServiceAddress(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java new file mode 100644 index 0000000..5f378b9 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; +import org.apache.commons.lang3.StringUtils; + +/** + * A basic implementation of the DiscoverableService interface. To services are + * considered equal if they have the same case-sensitive service name. + * + * @author unattributed + */ +public class DiscoverableServiceImpl implements DiscoverableService { + + private final String serviceName; + + private final InetSocketAddress serviceAddress; + + public DiscoverableServiceImpl(final String serviceName, final InetSocketAddress serviceAddress) { + if (StringUtils.isBlank(serviceName)) { + throw new IllegalArgumentException("Service name may not be null or empty."); + } else if (serviceAddress == null) { + throw new IllegalArgumentException("Service address may not be null."); + } + this.serviceName = serviceName; + this.serviceAddress = serviceAddress; + } + + @Override + public InetSocketAddress getServiceAddress() { + return serviceAddress; + } + + @Override + public String getServiceName() { + return serviceName; + } + + @Override + public String toString() { + return String.format("[Discoverable Service: %s available at %s:%d]", serviceName, serviceAddress.getHostName(), serviceAddress.getPort()); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof DiscoverableService)) { + return false; + } + final DiscoverableService other = (DiscoverableService) obj; + return !((this.serviceName == null) ? (other.getServiceName() != null) : !this.serviceName.equals(other.getServiceName())); + } + + @Override + public int hashCode() { + int hash = 5; + hash = 53 * hash + (this.serviceName != null ? this.serviceName.hashCode() : 0); + return hash; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java new file mode 100644 index 0000000..ea0b72a --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +/** + * @author unattributed + */ +public final class MulticastConfiguration { + + private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL; + + private Integer socketTimeout; + + private Integer receiveBufferSize; + + private Integer sendBufferSize; + + private Boolean reuseAddress; + + private Integer trafficClass; + + private Boolean loopbackMode; + + public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = MulticastTimeToLive.SAME_SUBNET; + + public MulticastTimeToLive getTtl() { + return ttl; + } + + public void setTtl(final MulticastTimeToLive ttl) { + if (ttl == null) { + throw new NullPointerException("Multicast TTL may not be null."); + } + this.ttl = ttl; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(Integer socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public Boolean getReuseAddress() { + return reuseAddress; + } + + public void setReuseAddress(Boolean reuseAddress) { + this.reuseAddress = reuseAddress; + } + + public Integer getReceiveBufferSize() { + return receiveBufferSize; + } + + public void setReceiveBufferSize(Integer receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public Integer getSendBufferSize() { + return sendBufferSize; + } + + public void setSendBufferSize(Integer sendBufferSize) { + this.sendBufferSize = sendBufferSize; + } + + public Integer getTrafficClass() { + return trafficClass; + } + + public void setTrafficClass(Integer trafficClass) { + this.trafficClass = trafficClass; + } + + public Boolean getLoopbackMode() { + return loopbackMode; + } + + public void setLoopbackMode(Boolean loopbackMode) { + this.loopbackMode = loopbackMode; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java new file mode 100644 index 0000000..e562c25 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements a listener for protocol messages sent over multicast. If a message + * is of type MulticastProtocolMessage, then the underlying protocol message is + * passed to the handler. If the receiving handler produces a message response, + * then the message is wrapped with a MulticastProtocolMessage before being sent + * to the originator. + * + * @author unattributed + */ +public abstract class MulticastListener { + + // constants + private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5; + private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512; + + private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class)); + + // immutable members + private final int numThreads; + private final InetSocketAddress multicastAddress; + private final MulticastConfiguration configuration; + + private volatile ExecutorService executorService; // volatile to guarantee most current value is visible + private volatile MulticastSocket multicastSocket; // volatile to guarantee most current value is visible + + private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS; + private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES; + + public MulticastListener( + final int numThreads, + final InetSocketAddress multicastAddress, + final MulticastConfiguration configuration) { + + if (numThreads <= 0) { + throw new IllegalArgumentException("Number of threads may not be less than or equal to zero."); + } else if (multicastAddress == null) { + throw new IllegalArgumentException("Multicast address may not be null."); + } else if (multicastAddress.getAddress().isMulticastAddress() == false) { + throw new IllegalArgumentException("Multicast group must be a Class D address."); + } else if (configuration == null) { + throw new IllegalArgumentException("Multicast configuration may not be null."); + } + + this.numThreads = numThreads; + this.multicastAddress = multicastAddress; + this.configuration = configuration; + } + + /** + * Implements the action to perform when a new datagram is received. This + * class must not close the multicast socket. + * + * @param multicastSocket + * @param packet the datagram socket + */ + public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet); + + public void start() throws IOException { + + if (isRunning()) { + return; + } + + multicastSocket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration); + multicastSocket.joinGroup(multicastAddress.getAddress()); + + executorService = Executors.newFixedThreadPool(numThreads); + + final ExecutorService runnableExecServiceRef = executorService; + final MulticastSocket runnableMulticastSocketRef = multicastSocket; + + new Thread(new Runnable() { + @Override + public void run() { + while (runnableExecServiceRef.isShutdown() == false) { + try { + final byte[] buf = new byte[maxPacketSizeBytes]; + final DatagramPacket packet = new DatagramPacket(buf, maxPacketSizeBytes); + runnableMulticastSocketRef.receive(packet); + runnableExecServiceRef.execute(new Runnable() { + @Override + public void run() { + dispatchRequest(multicastSocket, packet); + } + }); + } catch (final SocketException | SocketTimeoutException ste) { + /* ignore so that we can accept connections in approximately a non-blocking fashion */ + } catch (final Exception e) { + logger.warn("Cluster protocol receiver encountered exception: " + e, e); + } + } + } + }).start(); + } + + public boolean isRunning() { + return (executorService != null && executorService.isShutdown() == false); + } + + public void stop() throws IOException { + + if (isRunning() == false) { + return; + } + + // shutdown executor service + try { + if (getShutdownListenerSeconds() <= 0) { + executorService.shutdownNow(); + } else { + executorService.shutdown(); + } + executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + if (executorService.isTerminated()) { + logger.info("Multicast Listener has been terminated successfully."); + } else { + logger.warn("Multicast Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); + } + } + + // shutdown server socket + if (multicastSocket.isClosed() == false) { + multicastSocket.leaveGroup(multicastAddress.getAddress()); + multicastSocket.close(); + } + + } + + public int getShutdownListenerSeconds() { + return shutdownListenerSeconds; + } + + public void setShutdownListenerSeconds(final int shutdownListenerSeconds) { + this.shutdownListenerSeconds = shutdownListenerSeconds; + } + + public int getMaxPacketSizeBytes() { + return maxPacketSizeBytes; + } + + public void setMaxPacketSizeBytes(int maxPacketSizeBytes) { + if (maxPacketSizeBytes <= 0) { + throw new IllegalArgumentException("Max packet size must be greater than zero bytes."); + } + this.maxPacketSizeBytes = maxPacketSizeBytes; + } + + public MulticastConfiguration getConfiguration() { + return configuration; + } + + public InetSocketAddress getMulticastAddress() { + return multicastAddress; + } + + public int getNumThreads() { + return numThreads; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java new file mode 100644 index 0000000..c254c11 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; + +/** + * Defines the interface for discovering services based on name. Services are + * expected to be exposed via socket address and port. + * + * @author unattributed + */ +public interface MulticastServiceDiscovery extends ServiceDiscovery { + + /** + * @return the multicast address + */ + InetSocketAddress getMulticastAddress(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java new file mode 100644 index 0000000..a3cff9b --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.net.InetSocketAddress; + +/** + * Defines the interface for broadcasting a service via multicast. + * + * @author unattributed + */ +public interface MulticastServicesBroadcaster extends ServicesBroadcaster { + + /** + * @return the multicast address + */ + InetSocketAddress getMulticastAddress(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java new file mode 100644 index 0000000..dad1173 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +/** + * @author unattributed + */ +public enum MulticastTimeToLive { + + SAME_HOST(0), + SAME_SUBNET(1), + SAME_SITE(32), + SAME_REGION(64), + SAME_CONTINENT(128), + UNRESTRICTED(255); + + private final int ttl; + + MulticastTimeToLive(final int ttl) { + this.ttl = ttl; + } + + public int getTtl() { + return ttl; + } + + public MulticastTimeToLive valueOfByTtl(final int ttl) { + for (final MulticastTimeToLive value : values()) { + if (value.getTtl() == ttl) { + return value; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java new file mode 100644 index 0000000..8a8b7c0 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.MulticastSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author unattributed + */ +public final class MulticastUtils { + + private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastUtils.class)); + + public static MulticastSocket createMulticastSocket(final MulticastConfiguration config) throws IOException { + return createMulticastSocket(0, config); + } + + public static MulticastSocket createMulticastSocket(final int port, final MulticastConfiguration config) throws IOException { + if (config == null) { + throw new IllegalArgumentException("Configuration may not be null."); + } + + final MulticastSocket socket; + if (port <= 0) { + socket = new MulticastSocket(); + } else { + socket = new MulticastSocket(port); + } + socket.setTimeToLive(config.getTtl().getTtl()); + + if (config.getSocketTimeout() != null) { + socket.setSoTimeout(config.getSocketTimeout()); + } + + if (config.getReuseAddress() != null) { + socket.setReuseAddress(config.getReuseAddress()); + } + + if (config.getReceiveBufferSize() != null) { + socket.setReceiveBufferSize(config.getReceiveBufferSize()); + } + + if (config.getSendBufferSize() != null) { + socket.setSendBufferSize(config.getSendBufferSize()); + } + + if (config.getTrafficClass() != null) { + socket.setTrafficClass(config.getTrafficClass()); + } + + if (config.getLoopbackMode() != null) { + socket.setLoopbackMode(config.getLoopbackMode()); + } + + return socket; + } + + public static void closeQuietly(final MulticastSocket socket) { + + if (socket == null) { + return; + } + + try { + socket.close(); + } catch (final Exception ex) { + logger.debug("Failed to close multicast socket due to: " + ex, ex); + } + + } + + public static void closeQuietly(final MulticastSocket socket, final InetAddress groupAddress) { + + if (socket == null) { + return; + } + + try { + socket.leaveGroup(groupAddress); + } catch (final Exception ex) { + logger.debug("Failed to leave multicast group due to: " + ex, ex); + } + + try { + socket.close(); + } catch (final Exception ex) { + logger.debug("Failed to close multicast socket due to: " + ex, ex); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java new file mode 100644 index 0000000..173146e --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServiceDiscovery.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +/** + * Defines a generic interface for discovering services. + * + * @author unattributed + */ +public interface ServiceDiscovery { + + /** + * @return the discovered service + */ + DiscoverableService getService(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java new file mode 100644 index 0000000..86260d8 --- /dev/null +++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/ServicesBroadcaster.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.socket.multicast; + +import java.util.Set; + +/** + * Defines the interface for broadcasting a collection of services for client + * discovery. + * + * @author unattributed + */ +public interface ServicesBroadcaster { + + /** + * @return the delay in milliseconds to wait between successive broadcasts + */ + int getBroadcastDelayMs(); + + /** + * @return the broadcasted services + */ + Set<DiscoverableService> getServices(); + + /** + * Adds the given service to the set of broadcasted services. + * + * @param service a service + * @return true if the service was added to the set; false a service with + * the given service name already exists in the set. + */ + boolean addService(DiscoverableService service); + + /** + * Removes the service with the given service name from the set. + * + * @param serviceName a service name + * @return true if the service was removed; false otherwise + */ + boolean removeService(String serviceName); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java new file mode 100644 index 0000000..b5240c9 --- /dev/null +++ b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/ServerMain.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.io.IOException; +import java.util.Calendar; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.io.nio.BufferPool; +import org.apache.nifi.io.nio.ChannelListener; +import org.apache.nifi.io.nio.consumer.StreamConsumer; +import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author none + */ +public final class ServerMain { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServerMain.class); + + public static void main(final String[] args) throws IOException { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + final Map<StreamConsumer, ScheduledFuture<?>> consumerMap = new ConcurrentHashMap<>(); + final BufferPool bufferPool = new BufferPool(10, 5 << 20, false, 40.0); + ChannelListener listener = null; + try { + executor.scheduleWithFixedDelay(bufferPool, 0L, 5L, TimeUnit.SECONDS); + listener = new ChannelListener(5, new ExampleStreamConsumerFactory(executor, consumerMap), bufferPool, 5, TimeUnit.MILLISECONDS); + listener.setChannelReaderSchedulingPeriod(50L, TimeUnit.MILLISECONDS); + listener.addDatagramChannel(null, 20000, 32 << 20); + LOGGER.info("Listening for UDP data on port 20000"); + listener.addServerSocket(null, 20001, 64 << 20); + LOGGER.info("listening for TCP connections on port 20001"); + listener.addServerSocket(null, 20002, 64 << 20); + LOGGER.info("listening for TCP connections on port 20002"); + final Calendar endTime = Calendar.getInstance(); + endTime.add(Calendar.MINUTE, 30); + while (true) { + processAllConsumers(consumerMap); + if (endTime.before(Calendar.getInstance())) { + break; // time to shut down + } + } + } finally { + if (listener != null) { + LOGGER.info("Shutting down server...."); + listener.shutdown(1L, TimeUnit.SECONDS); + LOGGER.info("Consumer map size = " + consumerMap.size()); + while (consumerMap.size() > 0) { + processAllConsumers(consumerMap); + } + LOGGER.info("Consumer map size = " + consumerMap.size()); + } + executor.shutdown(); + } + } + + private static void processAllConsumers(final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) { + final Set<StreamConsumer> deadConsumers = new HashSet<>(); + for (final Map.Entry<StreamConsumer, ScheduledFuture<?>> entry : consumerMap.entrySet()) { + if (entry.getKey().isConsumerFinished()) { + entry.getValue().cancel(true); + deadConsumers.add(entry.getKey()); + } + } + for (final StreamConsumer consumer : deadConsumers) { + LOGGER.debug("removing consumer " + consumer); + consumerMap.remove(consumer); + } + } + + public static final class ConsumerRunner implements Runnable { + + private final StreamConsumer consumer; + + public ConsumerRunner(final StreamConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void run() { + if (consumer.isConsumerFinished()) { + return; + } + try { + consumer.process(); + } catch (IOException ex) { + LOGGER.error("", ex); + } + } + } + + public static final class ExampleStreamConsumerFactory implements StreamConsumerFactory { + + final ScheduledExecutorService executor; + final Map<StreamConsumer, ScheduledFuture<?>> consumerMap; + + public ExampleStreamConsumerFactory(final ScheduledExecutorService executor, final Map<StreamConsumer, ScheduledFuture<?>> consumerMap) { + this.executor = executor; + this.consumerMap = consumerMap; + } + + @Override + public StreamConsumer newInstance(final String streamId) { + final StreamConsumer consumer = new UselessStreamConsumer(streamId); + final ScheduledFuture<?> future = executor.scheduleWithFixedDelay(new ConsumerRunner(consumer), 0L, 10L, TimeUnit.MILLISECONDS); + consumerMap.put(consumer, future); + LOGGER.info("Added consumer: " + consumer); + return consumer; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java new file mode 100644 index 0000000..b3d214e --- /dev/null +++ b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/TCPClient.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author none + */ +public class TCPClient { + + private static final Logger logger = LoggerFactory.getLogger(TCPClient.class); + + public static void main(final String[] args) throws Exception { + final byte[] bytes = TCPClient.makeBytes(); + Thread first = new Thread(new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < 10; i++) { + sendData(20001, bytes); + } + } catch (Exception e) { + logger.error("Blew exception", e); + } + } + }); + Thread second = new Thread(new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < 10; i++) { + sendData(20002, bytes); + } + } catch (Exception e) { + logger.error("Blew exception", e); + } + } + }); + first.start(); + second.start(); + } + + public static byte[] makeBytes() { + byte[] bytes = new byte[2 << 20]; + return bytes; + } + + private static void sendData(final int port, final byte[] bytes) throws SocketException, IOException, InterruptedException { + long totalBytes; + try (Socket sock = new Socket("localhost", port)) { + sock.setTcpNoDelay(true); + sock.setSoTimeout(2000); + totalBytes = 0L; + logger.info("socket established " + sock + " to port " + port + " now waiting 5 seconds to send anything..."); + Thread.sleep(5000L); + for (int i = 0; i < 1000; i++) { + sock.getOutputStream().write(bytes); + totalBytes += bytes.length; + } sock.getOutputStream().flush(); + } + logger.info("Total bytes sent: " + totalBytes + " to port " + port); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java new file mode 100644 index 0000000..90f4c42 --- /dev/null +++ b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UDPClient.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author none + */ +public class UDPClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(UDPClient.class); + + public static void main(final String[] args) throws Exception { + final byte[] buffer = UDPClient.makeBytes(); + final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, new InetSocketAddress("localhost", 20000)); + final DatagramSocket socket = new DatagramSocket(); + final long startTime = System.nanoTime(); + for (int i = 0; i < 819200; i++) { // 100 MB + socket.send(packet); + } + final long endTime = System.nanoTime(); + final long durationMillis = (endTime - startTime) / 1000000; + LOGGER.info("Sent all UDP packets without any obvious errors | duration ms= " + durationMillis); + } + + public static byte[] makeBytes() { + byte[] bytes = new byte[128]; + return bytes; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java new file mode 100644 index 0000000..9ec26e9 --- /dev/null +++ b/commons/nifi-socket-utils/src/test/java/org/apache/nifi/io/nio/example/UselessStreamConsumer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.nio.example; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.nifi.io.nio.consumer.AbstractStreamConsumer; + +/** + * + * @author none + */ +public class UselessStreamConsumer extends AbstractStreamConsumer { + + public UselessStreamConsumer(final String id) { + super(id); + } + + @Override + protected void processBuffer(final ByteBuffer buffer) throws IOException { + } + + @Override + protected void onConsumerDone() { + System.err.println("IN consumer done"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/test/resources/log4j.xml b/commons/nifi-socket-utils/src/test/resources/log4j.xml new file mode 100644 index 0000000..8e93769 --- /dev/null +++ b/commons/nifi-socket-utils/src/test/resources/log4j.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <!-- Appender for printing formatted log statements to the console. --> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %40.40c - %m%n"/> + </layout> + </appender> + + <!-- Logger for managing logging statements for nifi --> + <logger name="nifi"> + <level value="debug"/> + </logger> + + <root> + <level value="warn"/> + <appender-ref ref="console"/> + </root> +</log4j:configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/.gitignore ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/.gitignore b/commons/nifi-stream-utils/.gitignore new file mode 100755 index 0000000..ea8c4bf --- /dev/null +++ b/commons/nifi-stream-utils/.gitignore @@ -0,0 +1 @@ +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/pom.xml b/commons/nifi-stream-utils/pom.xml new file mode 100644 index 0000000..0413575 --- /dev/null +++ b/commons/nifi-stream-utils/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-stream-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + <name>NiFi Stream Utils</name> + + <build> + <plugins> + <!-- Enforce 1.6 compliance --> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java new file mode 100644 index 0000000..57adb8c --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedInputStream.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.InputStream; + +/** + * This class is a slight modification of the BufferedInputStream in the java.io + * package. The modification is that this implementation does not provide + * synchronization on method calls, which means that this class is not suitable + * for use by multiple threads. However, the absence of these synchronized + * blocks results in potentially much better performance. + */ +public class BufferedInputStream extends java.io.BufferedInputStream { + + public BufferedInputStream(final InputStream in) { + super(in); + } + + public BufferedInputStream(final InputStream in, final int size) { + super(in, size); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java new file mode 100644 index 0000000..56caf65 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/BufferedOutputStream.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * This class is a slight modification of the + * {@link java.io.BufferedOutputStream} class. This implementation differs in + * that it does not mark methods as synchronized. This means that this class is + * not suitable for writing by multiple concurrent threads. However, the removal + * of the synchronized keyword results in potentially much better performance. + */ +public class BufferedOutputStream extends FilterOutputStream { + + /** + * The internal buffer where data is stored. + */ + protected byte buf[]; + + /** + * The number of valid bytes in the buffer. This value is always in the + * range <tt>0</tt> through <tt>buf.length</tt>; elements + * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data. + */ + protected int count; + + /** + * Creates a new buffered output stream to write data to the specified + * underlying output stream. + * + * @param out the underlying output stream. + */ + public BufferedOutputStream(OutputStream out) { + this(out, 8192); + } + + /** + * Creates a new buffered output stream to write data to the specified + * underlying output stream with the specified buffer size. + * + * @param out the underlying output stream. + * @param size the buffer size. + * @exception IllegalArgumentException if size <= 0. + */ + public BufferedOutputStream(OutputStream out, int size) { + super(out); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buf = new byte[size]; + } + + /** + * Flush the internal buffer + */ + private void flushBuffer() throws IOException { + if (count > 0) { + out.write(buf, 0, count); + count = 0; + } + } + + /** + * Writes the specified byte to this buffered output stream. + * + * @param b the byte to be written. + * @exception IOException if an I/O error occurs. + */ + @Override + public void write(int b) throws IOException { + if (count >= buf.length) { + flushBuffer(); + } + buf[count++] = (byte) b; + } + + /** + * Writes <code>len</code> bytes from the specified byte array starting at + * offset <code>off</code> to this buffered output stream. + * + * <p> + * Ordinarily this method stores bytes from the given array into this + * stream's buffer, flushing the buffer to the underlying output stream as + * needed. If the requested length is at least as large as this stream's + * buffer, however, then this method will flush the buffer and write the + * bytes directly to the underlying output stream. Thus redundant + * <code>BufferedOutputStream</code>s will not copy data unnecessarily. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @exception IOException if an I/O error occurs. + */ + @Override + public void write(byte b[], int off, int len) throws IOException { + if (len >= buf.length) { + /* If the request length exceeds the size of the output buffer, + flush the output buffer and then write the data directly. + In this way buffered streams will cascade harmlessly. */ + flushBuffer(); + out.write(b, off, len); + return; + } + if (len >= buf.length - count) { + flushBuffer(); + } + System.arraycopy(b, off, buf, count, len); + count += len; + } + + /** + * Flushes this buffered output stream. This forces any buffered output + * bytes to be written out to the underlying output stream. + * + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public void flush() throws IOException { + flushBuffer(); + out.flush(); + } +}