http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java deleted file mode 100644 index 2ae2c07..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio; - -import java.io.IOException; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * @author none - */ -public final class ChannelDispatcher implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class); - private final Selector serverSocketSelector; - private final Selector socketChannelSelector; - private final ScheduledExecutorService executor; - private final BufferPool emptyBuffers; - private final StreamConsumerFactory factory; - private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS); - private final long timeout; - private volatile boolean stop = false; - public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L; - - public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service, - final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) { - this.serverSocketSelector = serverSocketSelector; - this.socketChannelSelector = socketChannelSelector; - this.executor = service; - this.factory = factory; - emptyBuffers = buffers; - this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); - } - - public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) { - channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit)); - } - - @Override - public void run() { - while (!stop) { - try { - selectServerSocketKeys(); - selectSocketChannelKeys(); - } catch (final Exception ex) { - LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex}); - } - } - } - - /* - * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all - * channels' keys. - * - * @throws IOException - */ - private void selectServerSocketKeys() throws IOException { - int numSelected = serverSocketSelector.select(timeout); - if (numSelected == 0) { - return; - } - - // for each registered server socket - see if any connections are waiting to be established - final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator(); - while (itr.hasNext()) { - SelectionKey serverSocketkey = itr.next(); - final SelectableChannel channel = serverSocketkey.channel(); - AbstractChannelReader reader = null; - if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) { - final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel(); - final SocketChannel sChannel = ssChannel.accept(); - if (sChannel != null) { - sChannel.configureBlocking(false); - final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ); - final String readerId = sChannel.socket().toString(); - reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory); - final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, - channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS); - reader.setScheduledFuture(readerFuture); - socketChannelKey.attach(reader); - } - } - itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0. - if (reader != null && LOGGER.isDebugEnabled()) { - LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader); - } - } - } - - /* - * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors - * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that - * the selected key set is not manually changed via a remove operation. - * - * @throws IOException - */ - private void selectSocketChannelKeys() throws IOException { - // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return. - // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys. - int numSelected = socketChannelSelector.select(timeout); - if (numSelected == 0) { - return; - } - - for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) { - final SelectableChannel channel = socketChannelKey.channel(); - AbstractChannelReader reader = null; - // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own - // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However, - // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only - // way to tell if it's new is the lack of an attachment. - if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) { - reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory); - socketChannelKey.attach(reader); - final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(), - TimeUnit.MILLISECONDS); - reader.setScheduledFuture(readerFuture); - } - if (reader != null && LOGGER.isDebugEnabled()) { - LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader); - } - } - - } - - public void stop() { - stop = true; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java deleted file mode 100644 index b0a1cfb..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; - -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class provides the entry point to NIO based socket listeners for NiFi - * processors and services. There are 2 supported types of Listeners, Datagram - * (UDP based transmissions) and ServerSocket (TCP based transmissions). This - * will create the ChannelDispatcher, which is a Runnable and is controlled via - * the ScheduledExecutorService, which is also created by this class. The - * ChannelDispatcher handles connections to the ServerSocketChannels and creates - * the readers associated with the resulting SocketChannels. Additionally, this - * creates and manages two Selectors, one for ServerSocketChannels and another - * for SocketChannels and DatagramChannels. - * - * The threading model for this consists of one thread for the - * ChannelDispatcher, one thread per added SocketChannel reader, one thread per - * added DatagramChannel reader. The ChannelDispatcher is not scheduled with - * fixed delay as the others are. It is throttled by the provided timeout value. - * Within the ChannelDispatcher there are two blocking operations which will - * block for the given timeout each time through the enclosing loop. - * - * All channels are cached in one of the two Selectors via their SelectionKey. - * The serverSocketSelector maintains all the added ServerSocketChannels; the - * socketChannelSelector maintains the all the add DatagramChannels and the - * created SocketChannels. Further, the SelectionKey of the DatagramChannel and - * the SocketChannel is injected with the channel's associated reader. - * - * All ChannelReaders will get throttled by the unavailability of buffers in the - * provided BufferPool. This is designed to create back pressure. - * - * @author none - */ -public final class ChannelListener { - - private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class); - private final ScheduledExecutorService executor; - private final Selector serverSocketSelector; // used to listen for new connections - private final Selector socketChannelSelector; // used to listen on existing connections - private final ChannelDispatcher channelDispatcher; - private final BufferPool bufferPool; - private final int initialBufferPoolSize; - private volatile long channelReaderFrequencyMSecs = 50; - - public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout, - TimeUnit unit) throws IOException { - this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread - this.serverSocketSelector = Selector.open(); - this.socketChannelSelector = Selector.open(); - this.bufferPool = bufferPool; - this.initialBufferPoolSize = bufferPool.size(); - channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool, - timeout, unit); - executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS); - } - - public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) { - channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit); - channelDispatcher.setChannelReaderFrequency(period, unit); - } - - /** - * Adds a server socket channel for listening to connections. - * - * @param nicIPAddress - if null binds to wildcard address - * @param port - port to bind to - * @param receiveBufferSize - size of OS receive buffer to request. If less - * than 0 then will not be set and OS default will win. - * @throws IOException - */ - public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) - throws IOException { - final ServerSocketChannel ssChannel = ServerSocketChannel.open(); - ssChannel.configureBlocking(false); - if (receiveBufferSize > 0) { - ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < receiveBufferSize) { - LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to " - + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize - + "bytes. You may want to consider changing the Operating System's " - + "maximum receive buffer"); - } - } - ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - ssChannel.bind(new InetSocketAddress(nicIPAddress, port)); - ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT); - } - - /** - * Binds to listen for data grams on the given local IPAddress/port - * - * @param nicIPAddress - if null will listen on wildcard address, which - * means datagrams will be received on all local network interfaces. - * Otherwise, will bind to the provided IP address associated with some NIC. - * @param port - the port to listen on - * @param receiveBufferSize - the number of bytes to request for a receive - * buffer from OS - * @throws IOException - */ - public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) - throws IOException { - final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize); - dChannel.register(socketChannelSelector, SelectionKey.OP_READ); - } - - /** - * Binds to listen for data grams on the given local IPAddress/port and - * restricts receipt of datagrams to those from the provided host and port, - * must specify both. This improves performance for datagrams coming from a - * sender that is known a-priori. - * - * @param nicIPAddress - if null will listen on wildcard address, which - * means datagrams will be received on all local network interfaces. - * Otherwise, will bind to the provided IP address associated with some NIC. - * @param port - the port to listen on. This is used to provide a well-known - * destination for a sender. - * @param receiveBufferSize - the number of bytes to request for a receive - * buffer from OS - * @param sendingHost - the hostname, or IP address, of the sender of - * datagrams. Only datagrams from this host will be received. If this is - * null the wildcard ip is used, which means datagrams may be received from - * any network interface on the local host. - * @param sendingPort - the port used by the sender of datagrams. Only - * datagrams from this port will be received. - * @throws IOException - */ - public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost, - final Integer sendingPort) throws IOException { - - if (sendingHost == null || sendingPort == null) { - addDatagramChannel(nicIPAddress, port, receiveBufferSize); - return; - } - final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize); - dChannel.connect(new InetSocketAddress(sendingHost, sendingPort)); - dChannel.register(socketChannelSelector, SelectionKey.OP_READ); - } - - private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize) - throws IOException { - final DatagramChannel dChannel = DatagramChannel.open(); - dChannel.configureBlocking(false); - if (receiveBufferSize > 0) { - dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF); - if (actualReceiveBufSize < receiveBufferSize) { - LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to " - + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize - + "bytes. You may want to consider changing the Operating System's " - + "maximum receive buffer"); - } - } - dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); - dChannel.bind(new InetSocketAddress(nicIPAddress, port)); - return dChannel; - } - - public void shutdown(final long period, final TimeUnit timeUnit) { - channelDispatcher.stop(); - for (SelectionKey selectionKey : socketChannelSelector.keys()) { - final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment(); - selectionKey.cancel(); - if (reader != null) { - while (!reader.isClosed()) { - try { - Thread.sleep(channelReaderFrequencyMSecs); - } catch (InterruptedException e) { - } - } - final ScheduledFuture<?> readerFuture = reader.getScheduledFuture(); - readerFuture.cancel(false); - } - IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist... - } - IOUtils.closeQuietly(socketChannelSelector); - - for (SelectionKey selectionKey : serverSocketSelector.keys()) { - selectionKey.cancel(); - IOUtils.closeQuietly(selectionKey.channel()); - } - IOUtils.closeQuietly(serverSocketSelector); - executor.shutdown(); - try { - executor.awaitTermination(period, timeUnit); - } catch (final InterruptedException ex) { - LOGGER.warn("Interrupted while trying to shutdown executor"); - } - final int currentBufferPoolSize = bufferPool.size(); - final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize - + " Current buffer count=" + currentBufferPoolSize - + " Could indicate a buffer leak. Ensure all consumers are executed until they complete." : ""; - LOGGER.info("Channel listener shutdown. " + warning); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java deleted file mode 100644 index 1eb5c7e..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; - -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; - -/** - * - * @author none - */ -public final class DatagramChannelReader extends AbstractChannelReader { - - public static final int MAX_UDP_PACKET_SIZE = 65507; - - public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { - super(id, key, empties, consumerFactory); - } - - /** - * Will receive UDP data from channel and won't receive anything unless the - * given buffer has enough space for at least one full max udp packet. - * - * @param key - * @param buffer - * @return - * @throws IOException - */ - @Override - protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException { - final DatagramChannel dChannel = (DatagramChannel) key.channel(); - final int initialBufferPosition = buffer.position(); - while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) { - if (dChannel.receive(buffer) == null) { - break; - } - } - return buffer.position() - initialBufferPosition; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java deleted file mode 100644 index db2c102..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; - -/** - * - * @author none - */ -public final class SocketChannelReader extends AbstractChannelReader { - - public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) { - super(id, key, empties, consumerFactory); - } - - /** - * Receives TCP data from the socket channel for the given key. - * - * @param key - * @param buffer - * @return - * @throws IOException - */ - @Override - protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException { - int bytesRead = 0; - final SocketChannel sChannel = (SocketChannel) key.channel(); - while (key.isValid() && key.isReadable()) { - bytesRead = sChannel.read(buffer); - if (bytesRead <= 0) { - break; - } - } - return bytesRead; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java deleted file mode 100644 index fce59c6..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.consumer; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.nifi.io.nio.BufferPool; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; - -/** - * - * @author none - */ -public abstract class AbstractStreamConsumer implements StreamConsumer { - - private final String uniqueId; - private BufferPool bufferPool = null; - private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>(); - private final AtomicBoolean streamEnded = new AtomicBoolean(false); - private final AtomicBoolean consumerEnded = new AtomicBoolean(false); - - public AbstractStreamConsumer(final String id) { - uniqueId = id; - } - - @Override - public final void setReturnBufferQueue(final BufferPool returnQueue) { - bufferPool = returnQueue; - } - - @Override - public final void addFilledBuffer(final ByteBuffer buffer) { - if (isConsumerFinished()) { - buffer.clear(); - bufferPool.returnBuffer(buffer, buffer.remaining()); - } else { - filledBuffers.add(buffer); - } - } - - @Override - public final void process() throws IOException { - if (isConsumerFinished()) { - return; - } - if (streamEnded.get() && filledBuffers.isEmpty()) { - consumerEnded.set(true); - onConsumerDone(); - return; - } - final ByteBuffer buffer = filledBuffers.poll(); - if (buffer != null) { - final int bytesToProcess = buffer.remaining(); - try { - processBuffer(buffer); - } finally { - buffer.clear(); - bufferPool.returnBuffer(buffer, bytesToProcess); - } - } - } - - protected abstract void processBuffer(ByteBuffer buffer) throws IOException; - - @Override - public final void signalEndOfStream() { - streamEnded.set(true); - } - - /** - * Convenience method that is called when the consumer is done processing - * based on being told the signal is end of stream and has processed all - * given buffers. - */ - protected void onConsumerDone() { - } - - @Override - public final boolean isConsumerFinished() { - return consumerEnded.get(); - } - - @Override - public final String getId() { - return uniqueId; - } - - @Override - public final boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - if (obj.getClass() != getClass()) { - return false; - } - AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj; - return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals(); - } - - @Override - public final int hashCode() { - return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode(); - } - - @Override - public final String toString() { - return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java deleted file mode 100644 index d75b7d7..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.consumer; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.nifi.io.nio.BufferPool; - -/** - * A StreamConsumer must be thread safe. It may be accessed concurrently by a - * thread providing data to process and another thread that is processing that - * data. - * - * @author none - */ -public interface StreamConsumer { - - /** - * Will be called once just after construction. It provides the queue to - * which processed and emptied and cleared buffers must be returned. For - * each time <code>addFilledBuffer</code> is called there should be an - * associated add to this given queue. If not, buffers will run out and all - * stream processing will halt. READ THIS!!! - * - * @param returnQueue - */ - void setReturnBufferQueue(BufferPool returnQueue); - - /** - * Will be called by the thread that produces byte buffers with available - * data to be processed. If the consumer is finished this should simply - * return the given buffer to the return buffer queue (after it is cleared) - * - * @param buffer - */ - void addFilledBuffer(ByteBuffer buffer); - - /** - * Will be called by the thread that executes the consumption of data. May - * be called many times though once <code>isConsumerFinished</code> returns - * true this method will likely do nothing. - * @throws java.io.IOException - */ - void process() throws IOException; - - /** - * Called once the end of the input stream is detected - */ - void signalEndOfStream(); - - /** - * If true signals the consumer is done consuming data and will not process - * any more buffers. - * - * @return - */ - boolean isConsumerFinished(); - - /** - * Uniquely identifies the consumer - * - * @return - */ - String getId(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java deleted file mode 100644 index df298d5..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.nio.consumer; - -/** - * - * @author none - */ -public interface StreamConsumerFactory { - - StreamConsumer newInstance(String streamId); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java deleted file mode 100644 index 7ed5ad4..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; - -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.nifi.util.NiFiProperties; - -public class SSLContextFactory { - - private final String keystore; - private final char[] keystorePass; - private final String keystoreType; - private final String truststore; - private final char[] truststorePass; - private final String truststoreType; - - private final KeyManager[] keyManagers; - private final TrustManager[] trustManagers; - - public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException { - keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE); - keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD)); - keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE); - - truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE); - truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD)); - truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE); - - // prepare the keystore - final KeyStore keyStore = KeyStore.getInstance(keystoreType); - keyStore.load(new FileInputStream(keystore), keystorePass); - final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - keyManagerFactory.init(keyStore, keystorePass); - - // prepare the truststore - final KeyStore trustStore = KeyStore.getInstance(truststoreType); - trustStore.load(new FileInputStream(truststore), truststorePass); - final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - trustManagerFactory.init(trustStore); - - keyManagers = keyManagerFactory.getKeyManagers(); - trustManagers = trustManagerFactory.getTrustManagers(); - } - - private static char[] getPass(final String password) { - return password == null ? null : password.toCharArray(); - } - - /** - * Creates a SSLContext instance using the given information. - * - * - * @return a SSLContext instance - * @throws java.security.KeyStoreException - * @throws java.io.IOException - * @throws java.security.NoSuchAlgorithmException - * @throws java.security.cert.CertificateException - * @throws java.security.UnrecoverableKeyException - * @throws java.security.KeyManagementException - */ - public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, - UnrecoverableKeyException, KeyManagementException { - - // initialize the ssl context - final SSLContext sslContext = SSLContext.getInstance("TLS"); - sslContext.init(keyManagers, trustManagers, new SecureRandom()); - sslContext.getDefaultSSLParameters().setNeedClientAuth(true); - - return sslContext; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java deleted file mode 100644 index fc279fb..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; - -import javax.net.ssl.SSLContext; - -/** - * @author unattributed - */ -public final class ServerSocketConfiguration { - - private boolean needClientAuth; - private Integer socketTimeout; - private Boolean reuseAddress; - private Integer receiveBufferSize; - private SSLContextFactory sslContextFactory; - - public ServerSocketConfiguration() { - } - - public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { - return sslContextFactory == null ? null : sslContextFactory.createSslContext(); - } - - public void setSSLContextFactory(final SSLContextFactory sslContextFactory) { - this.sslContextFactory = sslContextFactory; - } - - public Integer getSocketTimeout() { - return socketTimeout; - } - - public void setSocketTimeout(Integer socketTimeout) { - this.socketTimeout = socketTimeout; - } - - public boolean getNeedClientAuth() { - return needClientAuth; - } - - public void setNeedClientAuth(boolean needClientAuth) { - this.needClientAuth = needClientAuth; - } - - public Boolean getReuseAddress() { - return reuseAddress; - } - - public void setReuseAddress(Boolean reuseAddress) { - this.reuseAddress = reuseAddress; - } - - public Integer getReceiveBufferSize() { - return receiveBufferSize; - } - - public void setReceiveBufferSize(Integer receiveBufferSize) { - this.receiveBufferSize = receiveBufferSize; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java deleted file mode 100644 index c24b540..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; - -import javax.net.ssl.SSLContext; - -/** - * @author unattributed - */ -public final class SocketConfiguration { - - private Integer socketTimeout; - private Integer receiveBufferSize; - private Integer sendBufferSize; - private Boolean reuseAddress; - private Boolean keepAlive; - private Boolean oobInline; - private Boolean tcpNoDelay; - private Integer trafficClass; - private SSLContextFactory sslContextFactory; - - public SSLContext createSSLContext() throws KeyManagementException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyStoreException, CertificateException, FileNotFoundException, IOException { - return sslContextFactory == null ? null : sslContextFactory.createSslContext(); - } - - public void setSSLContextFactory(final SSLContextFactory sslContextFactory) { - this.sslContextFactory = sslContextFactory; - } - - public Integer getSocketTimeout() { - return socketTimeout; - } - - public void setSocketTimeout(Integer socketTimeout) { - this.socketTimeout = socketTimeout; - } - - public Boolean getReuseAddress() { - return reuseAddress; - } - - public void setReuseAddress(Boolean reuseAddress) { - this.reuseAddress = reuseAddress; - } - - public Boolean getKeepAlive() { - return keepAlive; - } - - public void setKeepAlive(Boolean keepAlive) { - this.keepAlive = keepAlive; - } - - public Boolean getOobInline() { - return oobInline; - } - - public void setOobInline(Boolean oobInline) { - this.oobInline = oobInline; - } - - public Integer getReceiveBufferSize() { - return receiveBufferSize; - } - - public void setReceiveBufferSize(Integer receiveBufferSize) { - this.receiveBufferSize = receiveBufferSize; - } - - public Integer getSendBufferSize() { - return sendBufferSize; - } - - public void setSendBufferSize(Integer sendBufferSize) { - this.sendBufferSize = sendBufferSize; - } - - public Boolean getTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(Boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public Integer getTrafficClass() { - return trafficClass; - } - - public void setTrafficClass(Integer trafficClass) { - this.trafficClass = trafficClass; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java deleted file mode 100644 index e02791a..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket; - -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.nifi.logging.NiFiLog; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a listener for TCP/IP messages sent over unicast socket. - * - * @author unattributed - */ -public abstract class SocketListener { - - private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5; - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketListener.class)); - private volatile ExecutorService executorService; // volatile to guarantee most current value is visible - private volatile ServerSocket serverSocket; // volatile to guarantee most current value is visible - private final int numThreads; - private final int port; - private final ServerSocketConfiguration configuration; - private final AtomicInteger shutdownListenerSeconds = new AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS); - - public SocketListener( - final int numThreads, - final int port, - final ServerSocketConfiguration configuration) { - - if (numThreads <= 0) { - throw new IllegalArgumentException("Number of threads may not be less than or equal to zero."); - } else if (configuration == null) { - throw new IllegalArgumentException("Server socket configuration may not be null."); - } - - this.numThreads = numThreads; - this.port = port; - this.configuration = configuration; - } - - /** - * Implements the action to perform when a new socket request is received. - * This class will close the socket. - * - * @param socket the socket - */ - public abstract void dispatchRequest(final Socket socket); - - public void start() throws IOException { - - if (isRunning()) { - return; - } - - try { - serverSocket = SocketUtils.createServerSocket(port, configuration); - } catch (KeyManagementException | UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException | CertificateException e) { - throw new IOException(e); - } - - final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); - executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() { - private final AtomicLong threadCounter = new AtomicLong(0L); - - @Override - public Thread newThread(final Runnable r) { - final Thread newThread = defaultThreadFactory.newThread(r); - newThread.setName("Process NCM Request-" + threadCounter.incrementAndGet()); - return newThread; - } - }); - - final ExecutorService runnableExecServiceRef = executorService; - final ServerSocket runnableServerSocketRef = serverSocket; - - final Thread t = new Thread(new Runnable() { - @Override - public void run() { - while (runnableExecServiceRef.isShutdown() == false) { - Socket socket = null; - try { - try { - socket = runnableServerSocketRef.accept(); - if (configuration.getSocketTimeout() != null) { - socket.setSoTimeout(configuration.getSocketTimeout()); - } - } catch (final SocketTimeoutException ste) { - // nobody connected to us. Go ahead and call closeQuietly just to make sure we don't leave - // any sockets lingering - SocketUtils.closeQuietly(socket); - continue; - } catch (final SocketException se) { - logger.warn("Failed to communicate with " + (socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " due to " + se, se); - SocketUtils.closeQuietly(socket); - continue; - } catch (final Throwable t) { - logger.warn("Socket Listener encountered exception: " + t, t); - SocketUtils.closeQuietly(socket); - continue; - } - - final Socket finalSocket = socket; - runnableExecServiceRef.execute(new Runnable() { - @Override - public void run() { - try { - dispatchRequest(finalSocket); - } catch (final Throwable t) { - logger.warn("Dispatching socket request encountered exception due to: " + t, t); - } finally { - SocketUtils.closeQuietly(finalSocket); - } - } - }); - } catch (final Throwable t) { - logger.error("Socket Listener encountered exception: " + t, t); - SocketUtils.closeQuietly(socket); - } - } - } - }); - t.setName("Cluster Socket Listener"); - t.start(); - } - - public boolean isRunning() { - return (executorService != null && executorService.isShutdown() == false); - } - - public void stop() throws IOException { - - if (isRunning() == false) { - return; - } - - // shutdown executor service - try { - if (getShutdownListenerSeconds() <= 0) { - executorService.shutdownNow(); - } else { - executorService.shutdown(); - } - executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS); - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - } finally { - if (executorService.isTerminated()) { - logger.info("Socket Listener has been terminated successfully."); - } else { - logger.warn("Socket Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); - } - } - - // shutdown server socket - SocketUtils.closeQuietly(serverSocket); - - } - - public int getShutdownListenerSeconds() { - return shutdownListenerSeconds.get(); - } - - public void setShutdownListenerSeconds(final int shutdownListenerSeconds) { - this.shutdownListenerSeconds.set(shutdownListenerSeconds); - } - - public ServerSocketConfiguration getConfiguration() { - return configuration; - } - - public int getPort() { - if (isRunning()) { - return serverSocket.getLocalPort(); - } else { - return port; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java deleted file mode 100644 index fb6a00c..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketUtils.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocket; -import javax.net.ssl.SSLSocket; - -import org.apache.nifi.logging.NiFiLog; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author unattributed - */ -public final class SocketUtils { - - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(SocketUtils.class)); - - public static Socket createSocket(final InetSocketAddress address, final SocketConfiguration config) throws IOException { - if (address == null) { - throw new IllegalArgumentException("Socket address may not be null."); - } else if (config == null) { - throw new IllegalArgumentException("Configuration may not be null."); - } - - final Socket socket; - - final SSLContext sslContext; - try { - sslContext = config.createSSLContext(); - } catch (final Exception e) { - throw new IOException("Could not create SSLContext", e); - } - - if (sslContext == null) { - socket = new Socket(address.getHostName(), address.getPort()); - } else { - socket = sslContext.getSocketFactory().createSocket(address.getHostName(), address.getPort()); - } - - if (config.getSocketTimeout() != null) { - socket.setSoTimeout(config.getSocketTimeout()); - } - - if (config.getReuseAddress() != null) { - socket.setReuseAddress(config.getReuseAddress()); - } - - if (config.getReceiveBufferSize() != null) { - socket.setReceiveBufferSize(config.getReceiveBufferSize()); - } - - if (config.getSendBufferSize() != null) { - socket.setSendBufferSize(config.getSendBufferSize()); - } - - if (config.getTrafficClass() != null) { - socket.setTrafficClass(config.getTrafficClass()); - } - - if (config.getKeepAlive() != null) { - socket.setKeepAlive(config.getKeepAlive()); - } - - if (config.getOobInline() != null) { - socket.setOOBInline(config.getOobInline()); - } - - if (config.getTcpNoDelay() != null) { - socket.setTcpNoDelay(config.getTcpNoDelay()); - } - - return socket; - } - - public static ServerSocket createServerSocket(final int port, final ServerSocketConfiguration config) throws IOException, KeyManagementException, UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, CertificateException { - if (config == null) { - throw new NullPointerException("Configuration may not be null."); - } - - final SSLContext sslContext = config.createSSLContext(); - final ServerSocket serverSocket; - if (sslContext == null) { - serverSocket = new ServerSocket(port); - } else { - serverSocket = sslContext.getServerSocketFactory().createServerSocket(port); - ((SSLServerSocket) serverSocket).setNeedClientAuth(config.getNeedClientAuth()); - } - - if (config.getSocketTimeout() != null) { - serverSocket.setSoTimeout(config.getSocketTimeout()); - } - - if (config.getReuseAddress() != null) { - serverSocket.setReuseAddress(config.getReuseAddress()); - } - - if (config.getReceiveBufferSize() != null) { - serverSocket.setReceiveBufferSize(config.getReceiveBufferSize()); - } - - return serverSocket; - } - - public static void closeQuietly(final Socket socket) { - if (socket == null) { - return; - } - - try { - try { - // can't shudown input/output individually with secure sockets - if ((socket instanceof SSLSocket) == false) { - if (socket.isInputShutdown() == false) { - socket.shutdownInput(); - } - if (socket.isOutputShutdown() == false) { - socket.shutdownOutput(); - } - } - } finally { - if (socket.isClosed() == false) { - socket.close(); - } - } - } catch (final Exception ex) { - logger.debug("Failed to close socket due to: " + ex, ex); - } - } - - public static void closeQuietly(final ServerSocket serverSocket) { - if (serverSocket == null) { - return; - } - - try { - serverSocket.close(); - } catch (final Exception ex) { - logger.debug("Failed to close server socket due to: " + ex, ex); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java deleted file mode 100644 index 7a62813..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableService.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.net.InetSocketAddress; - -/** - * A service that may be discovered at runtime. A service is defined as having a - * unique case-sensitive service name and a socket address where it is - * available. - * - * @author unattributed - */ -public interface DiscoverableService { - - /** - * The service's name. Two services are considered equal if they have the - * same case sensitive service name. - * - * @return the service's name - */ - String getServiceName(); - - /** - * @return the service's address - */ - InetSocketAddress getServiceAddress(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java deleted file mode 100644 index 5f378b9..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/DiscoverableServiceImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.net.InetSocketAddress; -import org.apache.commons.lang3.StringUtils; - -/** - * A basic implementation of the DiscoverableService interface. To services are - * considered equal if they have the same case-sensitive service name. - * - * @author unattributed - */ -public class DiscoverableServiceImpl implements DiscoverableService { - - private final String serviceName; - - private final InetSocketAddress serviceAddress; - - public DiscoverableServiceImpl(final String serviceName, final InetSocketAddress serviceAddress) { - if (StringUtils.isBlank(serviceName)) { - throw new IllegalArgumentException("Service name may not be null or empty."); - } else if (serviceAddress == null) { - throw new IllegalArgumentException("Service address may not be null."); - } - this.serviceName = serviceName; - this.serviceAddress = serviceAddress; - } - - @Override - public InetSocketAddress getServiceAddress() { - return serviceAddress; - } - - @Override - public String getServiceName() { - return serviceName; - } - - @Override - public String toString() { - return String.format("[Discoverable Service: %s available at %s:%d]", serviceName, serviceAddress.getHostName(), serviceAddress.getPort()); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (!(obj instanceof DiscoverableService)) { - return false; - } - final DiscoverableService other = (DiscoverableService) obj; - return !((this.serviceName == null) ? (other.getServiceName() != null) : !this.serviceName.equals(other.getServiceName())); - } - - @Override - public int hashCode() { - int hash = 5; - hash = 53 * hash + (this.serviceName != null ? this.serviceName.hashCode() : 0); - return hash; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java deleted file mode 100644 index ea0b72a..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastConfiguration.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -/** - * @author unattributed - */ -public final class MulticastConfiguration { - - private MulticastTimeToLive ttl = DEFAULT_MULTICAST_TTL; - - private Integer socketTimeout; - - private Integer receiveBufferSize; - - private Integer sendBufferSize; - - private Boolean reuseAddress; - - private Integer trafficClass; - - private Boolean loopbackMode; - - public static final MulticastTimeToLive DEFAULT_MULTICAST_TTL = MulticastTimeToLive.SAME_SUBNET; - - public MulticastTimeToLive getTtl() { - return ttl; - } - - public void setTtl(final MulticastTimeToLive ttl) { - if (ttl == null) { - throw new NullPointerException("Multicast TTL may not be null."); - } - this.ttl = ttl; - } - - public Integer getSocketTimeout() { - return socketTimeout; - } - - public void setSocketTimeout(Integer socketTimeout) { - this.socketTimeout = socketTimeout; - } - - public Boolean getReuseAddress() { - return reuseAddress; - } - - public void setReuseAddress(Boolean reuseAddress) { - this.reuseAddress = reuseAddress; - } - - public Integer getReceiveBufferSize() { - return receiveBufferSize; - } - - public void setReceiveBufferSize(Integer receiveBufferSize) { - this.receiveBufferSize = receiveBufferSize; - } - - public Integer getSendBufferSize() { - return sendBufferSize; - } - - public void setSendBufferSize(Integer sendBufferSize) { - this.sendBufferSize = sendBufferSize; - } - - public Integer getTrafficClass() { - return trafficClass; - } - - public void setTrafficClass(Integer trafficClass) { - this.trafficClass = trafficClass; - } - - public Boolean getLoopbackMode() { - return loopbackMode; - } - - public void setLoopbackMode(Boolean loopbackMode) { - this.loopbackMode = loopbackMode; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java deleted file mode 100644 index e562c25..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastListener.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a listener for protocol messages sent over multicast. If a message - * is of type MulticastProtocolMessage, then the underlying protocol message is - * passed to the handler. If the receiving handler produces a message response, - * then the message is wrapped with a MulticastProtocolMessage before being sent - * to the originator. - * - * @author unattributed - */ -public abstract class MulticastListener { - - // constants - private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5; - private static final int DEFAULT_MAX_PACKET_SIZE_BYTES = 512; - - private static final Logger logger = new org.apache.nifi.logging.NiFiLog(LoggerFactory.getLogger(MulticastListener.class)); - - // immutable members - private final int numThreads; - private final InetSocketAddress multicastAddress; - private final MulticastConfiguration configuration; - - private volatile ExecutorService executorService; // volatile to guarantee most current value is visible - private volatile MulticastSocket multicastSocket; // volatile to guarantee most current value is visible - - private int shutdownListenerSeconds = DEFAULT_SHUTDOWN_LISTENER_SECONDS; - private int maxPacketSizeBytes = DEFAULT_MAX_PACKET_SIZE_BYTES; - - public MulticastListener( - final int numThreads, - final InetSocketAddress multicastAddress, - final MulticastConfiguration configuration) { - - if (numThreads <= 0) { - throw new IllegalArgumentException("Number of threads may not be less than or equal to zero."); - } else if (multicastAddress == null) { - throw new IllegalArgumentException("Multicast address may not be null."); - } else if (multicastAddress.getAddress().isMulticastAddress() == false) { - throw new IllegalArgumentException("Multicast group must be a Class D address."); - } else if (configuration == null) { - throw new IllegalArgumentException("Multicast configuration may not be null."); - } - - this.numThreads = numThreads; - this.multicastAddress = multicastAddress; - this.configuration = configuration; - } - - /** - * Implements the action to perform when a new datagram is received. This - * class must not close the multicast socket. - * - * @param multicastSocket - * @param packet the datagram socket - */ - public abstract void dispatchRequest(final MulticastSocket multicastSocket, final DatagramPacket packet); - - public void start() throws IOException { - - if (isRunning()) { - return; - } - - multicastSocket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration); - multicastSocket.joinGroup(multicastAddress.getAddress()); - - executorService = Executors.newFixedThreadPool(numThreads); - - final ExecutorService runnableExecServiceRef = executorService; - final MulticastSocket runnableMulticastSocketRef = multicastSocket; - - new Thread(new Runnable() { - @Override - public void run() { - while (runnableExecServiceRef.isShutdown() == false) { - try { - final byte[] buf = new byte[maxPacketSizeBytes]; - final DatagramPacket packet = new DatagramPacket(buf, maxPacketSizeBytes); - runnableMulticastSocketRef.receive(packet); - runnableExecServiceRef.execute(new Runnable() { - @Override - public void run() { - dispatchRequest(multicastSocket, packet); - } - }); - } catch (final SocketException | SocketTimeoutException ste) { - /* ignore so that we can accept connections in approximately a non-blocking fashion */ - } catch (final Exception e) { - logger.warn("Cluster protocol receiver encountered exception: " + e, e); - } - } - } - }).start(); - } - - public boolean isRunning() { - return (executorService != null && executorService.isShutdown() == false); - } - - public void stop() throws IOException { - - if (isRunning() == false) { - return; - } - - // shutdown executor service - try { - if (getShutdownListenerSeconds() <= 0) { - executorService.shutdownNow(); - } else { - executorService.shutdown(); - } - executorService.awaitTermination(getShutdownListenerSeconds(), TimeUnit.SECONDS); - } catch (final InterruptedException ex) { - Thread.currentThread().interrupt(); - } finally { - if (executorService.isTerminated()) { - logger.info("Multicast Listener has been terminated successfully."); - } else { - logger.warn("Multicast Listener has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop."); - } - } - - // shutdown server socket - if (multicastSocket.isClosed() == false) { - multicastSocket.leaveGroup(multicastAddress.getAddress()); - multicastSocket.close(); - } - - } - - public int getShutdownListenerSeconds() { - return shutdownListenerSeconds; - } - - public void setShutdownListenerSeconds(final int shutdownListenerSeconds) { - this.shutdownListenerSeconds = shutdownListenerSeconds; - } - - public int getMaxPacketSizeBytes() { - return maxPacketSizeBytes; - } - - public void setMaxPacketSizeBytes(int maxPacketSizeBytes) { - if (maxPacketSizeBytes <= 0) { - throw new IllegalArgumentException("Max packet size must be greater than zero bytes."); - } - this.maxPacketSizeBytes = maxPacketSizeBytes; - } - - public MulticastConfiguration getConfiguration() { - return configuration; - } - - public InetSocketAddress getMulticastAddress() { - return multicastAddress; - } - - public int getNumThreads() { - return numThreads; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java deleted file mode 100644 index c254c11..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServiceDiscovery.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.net.InetSocketAddress; - -/** - * Defines the interface for discovering services based on name. Services are - * expected to be exposed via socket address and port. - * - * @author unattributed - */ -public interface MulticastServiceDiscovery extends ServiceDiscovery { - - /** - * @return the multicast address - */ - InetSocketAddress getMulticastAddress(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java deleted file mode 100644 index a3cff9b..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastServicesBroadcaster.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -import java.net.InetSocketAddress; - -/** - * Defines the interface for broadcasting a service via multicast. - * - * @author unattributed - */ -public interface MulticastServicesBroadcaster extends ServicesBroadcaster { - - /** - * @return the multicast address - */ - InetSocketAddress getMulticastAddress(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java ---------------------------------------------------------------------- diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java deleted file mode 100644 index dad1173..0000000 --- a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/multicast/MulticastTimeToLive.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.io.socket.multicast; - -/** - * @author unattributed - */ -public enum MulticastTimeToLive { - - SAME_HOST(0), - SAME_SUBNET(1), - SAME_SITE(32), - SAME_REGION(64), - SAME_CONTINENT(128), - UNRESTRICTED(255); - - private final int ttl; - - MulticastTimeToLive(final int ttl) { - this.ttl = ttl; - } - - public int getTtl() { - return ttl; - } - - public MulticastTimeToLive valueOfByTtl(final int ttl) { - for (final MulticastTimeToLive value : values()) { - if (value.getTtl() == ttl) { - return value; - } - } - return null; - } - -}