This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 9ba2806 NIFI-6569, NIFI-6570: Fixed bug that caused read timeouts not to occur with site-to-site. Fixed bug that caused site-to-site listener not to accept connections if there are no input/output ports on the root group - this used to be a valid check and was done to prevent spawning extra threads and doing extra work if no ports exist but now that we have site-to-site ports outside of the root group it's no longer a reasonable condition to check. 9ba2806 is described below commit 9ba280680ff8b40b15d460e6f822c2b10d4373c3 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Mon Aug 19 15:29:51 2019 -0400 NIFI-6569, NIFI-6570: Fixed bug that caused read timeouts not to occur with site-to-site. Fixed bug that caused site-to-site listener not to accept connections if there are no input/output ports on the root group - this used to be a valid check and was done to prevent spawning extra threads and doing extra work if no ports exist but now that we have site-to-site ports outside of the root group it's no longer a reasonable condition to check. This closes #3658. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../nifi/remote/io/socket/SocketChannelInput.java | 8 +- .../remote/io/socket/SocketChannelInputStream.java | 94 ++++++++++++---------- .../nifi/remote/SocketRemoteSiteListener.java | 20 +---- 3 files changed, 62 insertions(+), 60 deletions(-) diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java index 85ae504..5cf2a62 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java @@ -16,13 +16,14 @@ */ package org.apache.nifi.remote.io.socket; +import org.apache.nifi.remote.io.InterruptableInputStream; +import org.apache.nifi.remote.protocol.CommunicationsInput; +import org.apache.nifi.stream.io.ByteCountingInputStream; + import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.channels.SocketChannel; -import org.apache.nifi.remote.io.InterruptableInputStream; -import org.apache.nifi.remote.protocol.CommunicationsInput; -import org.apache.nifi.stream.io.ByteCountingInputStream; public class SocketChannelInput implements CommunicationsInput { @@ -62,6 +63,7 @@ public class SocketChannelInput implements CommunicationsInput { public void interrupt() { interruptableIn.interrupt(); + socketIn.interrupt(); } @Override diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java index c0cfa11..21f1683 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java @@ -16,19 +16,24 @@ */ package org.apache.nifi.remote.io.socket; +import org.apache.nifi.remote.exception.TransmissionDisabledException; + import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; +import java.util.Set; public class SocketChannelInputStream extends InputStream { - private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); private final SocketChannel channel; private volatile int timeoutMillis = 30000; + private volatile boolean interrupted = false; + private final Selector readSelector; private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1); private Byte bufferedByte = null; @@ -37,6 +42,9 @@ public class SocketChannelInputStream extends InputStream { // this class expects a non-blocking channel socketChannel.configureBlocking(false); this.channel = socketChannel; + + readSelector = Selector.open(); + this.channel.register(readSelector, SelectionKey.OP_READ); } public void setTimeout(final int timeoutMillis) { @@ -68,32 +76,24 @@ public class SocketChannelInputStream extends InputStream { final long maxTime = System.currentTimeMillis() + timeoutMillis; - final boolean blocking = channel.isBlocking(); - - try { - channel.configureBlocking(true); + waitForReady(); - int bytesRead; - do { - bytesRead = channel.read(oneByteBuffer); - if (bytesRead == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out reading from socket"); - } + int bytesRead; + do { + bytesRead = channel.read(oneByteBuffer); + if (bytesRead == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out reading from socket"); } - } while (bytesRead == 0); - - if (bytesRead == -1) { - return -1; } + } while (bytesRead == 0); - oneByteBuffer.flip(); - return oneByteBuffer.get() & 0xFF; - } finally { - if (!blocking) { - channel.configureBlocking(false); - } + if (bytesRead == -1) { + return -1; } + + oneByteBuffer.flip(); + return oneByteBuffer.get() & 0xFF; } @Override @@ -110,29 +110,35 @@ public class SocketChannelInputStream extends InputStream { return 1; } - final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); + waitForReady(); - final boolean blocking = channel.isBlocking(); - try { - channel.configureBlocking(true); - - final long maxTime = System.currentTimeMillis() + timeoutMillis; - int bytesRead; - do { - bytesRead = channel.read(buffer); - if (bytesRead == 0) { - if (System.currentTimeMillis() > maxTime) { - throw new SocketTimeoutException("Timed out reading from socket"); - } + final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); + final long maxTime = System.currentTimeMillis() + timeoutMillis; + int bytesRead; + do { + bytesRead = channel.read(buffer); + if (bytesRead == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out reading from socket"); } - } while (bytesRead == 0); + } + } while (bytesRead == 0); - return bytesRead; - } finally { - if (!blocking) { - channel.configureBlocking(false); + return bytesRead; + } + + private void waitForReady() throws IOException { + int readyCount = readSelector.select(timeoutMillis); + if (readyCount < 1) { + if (interrupted) { + throw new TransmissionDisabledException(); } + + throw new SocketTimeoutException("Timed out reading from socket"); } + + final Set<SelectionKey> selectedKeys = readSelector.selectedKeys(); + selectedKeys.clear(); // clear the selected keys so that the Selector will be able to add them back to the ready set next time they are ready. } @Override @@ -164,6 +170,11 @@ public class SocketChannelInputStream extends InputStream { return false; } + public void interrupt() { + interrupted = true; + readSelector.wakeup(); + } + /** * Closes the underlying socket channel. * @@ -172,5 +183,6 @@ public class SocketChannelInputStream extends InputStream { @Override public void close() throws IOException { channel.close(); + readSelector.close(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java index f864ab7..6fa86ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java @@ -88,7 +88,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { @Override public void start() throws IOException { final boolean secure = (sslContext != null); - final List<Thread> threads = new ArrayList<Thread>(); + final List<Thread> threads = new ArrayList<>(); final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); @@ -101,17 +101,6 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { @Override public void run() { while (!stopped.get()) { - final ProcessGroup processGroup = rootGroup.get(); - // If nodeInformant is not null, we are in clustered mode, which means that we don't care about - // the processGroup. - if ((nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty()))) { - try { - Thread.sleep(2000L); - } catch (final Exception e) { - } - continue; - } - LOG.trace("Accepting Connection..."); Socket acceptedSocket = null; try { @@ -241,8 +230,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { commsSession.setTimeout((int) protocol.getRequestExpiration()); - LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[]{ - protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer}); + LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer); try { while (!protocol.isShutdown()) { @@ -258,7 +246,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { // Give the timeout a bit longer (twice as long) to receive the Request Type, // in order to attempt to receive more data without shutting down the socket if we don't // have to. - LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[]{this, protocol, peer}); + LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", this, protocol, peer); timeoutCount++; requestType = null; @@ -377,7 +365,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener { public void destroy() { } - private void verifyMagicBytes(final InputStream in, final String peerDescription) throws IOException, HandshakeException { + private void verifyMagicBytes(final InputStream in, final String peerDescription) throws IOException { final byte[] receivedMagicBytes = new byte[CommunicationsSession.MAGIC_BYTES.length]; // expect magic bytes