This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ba2806  NIFI-6569, NIFI-6570: Fixed bug that caused read timeouts not 
to occur with site-to-site. Fixed bug that caused site-to-site listener not to 
accept connections if there are no input/output ports on the root group - this 
used to be a valid check and was done to prevent spawning extra threads and 
doing extra work if no ports exist but now that we have site-to-site ports 
outside of the root group it's no longer a reasonable condition to check.
9ba2806 is described below

commit 9ba280680ff8b40b15d460e6f822c2b10d4373c3
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Mon Aug 19 15:29:51 2019 -0400

    NIFI-6569, NIFI-6570: Fixed bug that caused read timeouts not to occur with 
site-to-site. Fixed bug that caused site-to-site listener not to accept 
connections if there are no input/output ports on the root group - this used to 
be a valid check and was done to prevent spawning extra threads and doing extra 
work if no ports exist but now that we have site-to-site ports outside of the 
root group it's no longer a reasonable condition to check.
    
    This closes #3658.
    
    Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>
---
 .../nifi/remote/io/socket/SocketChannelInput.java  |  8 +-
 .../remote/io/socket/SocketChannelInputStream.java | 94 ++++++++++++----------
 .../nifi/remote/SocketRemoteSiteListener.java      | 20 +----
 3 files changed, 62 insertions(+), 60 deletions(-)

diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
index 85ae504..5cf2a62 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -16,13 +16,14 @@
  */
 package org.apache.nifi.remote.io.socket;
 
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.channels.SocketChannel;
-import org.apache.nifi.remote.io.InterruptableInputStream;
-import org.apache.nifi.remote.protocol.CommunicationsInput;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
 
 public class SocketChannelInput implements CommunicationsInput {
 
@@ -62,6 +63,7 @@ public class SocketChannelInput implements 
CommunicationsInput {
 
     public void interrupt() {
         interruptableIn.interrupt();
+        socketIn.interrupt();
     }
 
     @Override
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
index c0cfa11..21f1683 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -16,19 +16,24 @@
  */
 package org.apache.nifi.remote.io.socket;
 
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
 
 public class SocketChannelInputStream extends InputStream {
 
-    private static final long CHANNEL_EMPTY_WAIT_NANOS = 
TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
     private final SocketChannel channel;
     private volatile int timeoutMillis = 30000;
+    private volatile boolean interrupted = false;
+    private final Selector readSelector;
 
     private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
     private Byte bufferedByte = null;
@@ -37,6 +42,9 @@ public class SocketChannelInputStream extends InputStream {
         // this class expects a non-blocking channel
         socketChannel.configureBlocking(false);
         this.channel = socketChannel;
+
+        readSelector = Selector.open();
+        this.channel.register(readSelector, SelectionKey.OP_READ);
     }
 
     public void setTimeout(final int timeoutMillis) {
@@ -68,32 +76,24 @@ public class SocketChannelInputStream extends InputStream {
 
         final long maxTime = System.currentTimeMillis() + timeoutMillis;
 
-        final boolean blocking = channel.isBlocking();
-
-        try {
-            channel.configureBlocking(true);
+        waitForReady();
 
-            int bytesRead;
-            do {
-                bytesRead = channel.read(oneByteBuffer);
-                if (bytesRead == 0) {
-                    if (System.currentTimeMillis() > maxTime) {
-                        throw new SocketTimeoutException("Timed out reading 
from socket");
-                    }
+        int bytesRead;
+        do {
+            bytesRead = channel.read(oneByteBuffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from 
socket");
                 }
-            } while (bytesRead == 0);
-
-            if (bytesRead == -1) {
-                return -1;
             }
+        } while (bytesRead == 0);
 
-            oneByteBuffer.flip();
-            return oneByteBuffer.get() & 0xFF;
-        } finally {
-            if (!blocking) {
-                channel.configureBlocking(false);
-            }
+        if (bytesRead == -1) {
+            return -1;
         }
+
+        oneByteBuffer.flip();
+        return oneByteBuffer.get() & 0xFF;
     }
 
     @Override
@@ -110,29 +110,35 @@ public class SocketChannelInputStream extends InputStream 
{
             return 1;
         }
 
-        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+        waitForReady();
 
-        final boolean blocking = channel.isBlocking();
-        try {
-            channel.configureBlocking(true);
-
-            final long maxTime = System.currentTimeMillis() + timeoutMillis;
-            int bytesRead;
-            do {
-                bytesRead = channel.read(buffer);
-                if (bytesRead == 0) {
-                    if (System.currentTimeMillis() > maxTime) {
-                        throw new SocketTimeoutException("Timed out reading 
from socket");
-                    }
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(buffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from 
socket");
                 }
-            } while (bytesRead == 0);
+            }
+        } while (bytesRead == 0);
 
-            return bytesRead;
-        } finally {
-            if (!blocking) {
-                channel.configureBlocking(false);
+        return bytesRead;
+    }
+
+    private void waitForReady() throws IOException {
+        int readyCount = readSelector.select(timeoutMillis);
+        if (readyCount < 1) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
             }
+
+            throw new SocketTimeoutException("Timed out reading from socket");
         }
+
+        final Set<SelectionKey> selectedKeys = readSelector.selectedKeys();
+        selectedKeys.clear(); // clear the selected keys so that the Selector 
will be able to add them back to the ready set next time they are ready.
     }
 
     @Override
@@ -164,6 +170,11 @@ public class SocketChannelInputStream extends InputStream {
         return false;
     }
 
+    public void interrupt() {
+        interrupted = true;
+        readSelector.wakeup();
+    }
+
     /**
      * Closes the underlying socket channel.
      *
@@ -172,5 +183,6 @@ public class SocketChannelInputStream extends InputStream {
     @Override
     public void close() throws IOException {
         channel.close();
+        readSelector.close();
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index f864ab7..6fa86ff 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -88,7 +88,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
     @Override
     public void start() throws IOException {
         final boolean secure = (sslContext != null);
-        final List<Thread> threads = new ArrayList<Thread>();
+        final List<Thread> threads = new ArrayList<>();
 
         final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(true);
@@ -101,17 +101,6 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
             @Override
             public void run() {
                 while (!stopped.get()) {
-                    final ProcessGroup processGroup = rootGroup.get();
-                    // If nodeInformant is not null, we are in clustered mode, 
which means that we don't care about
-                    // the processGroup.
-                    if ((nodeInformant == null) && (processGroup == null || 
(processGroup.getInputPorts().isEmpty() && 
processGroup.getOutputPorts().isEmpty()))) {
-                        try {
-                            Thread.sleep(2000L);
-                        } catch (final Exception e) {
-                        }
-                        continue;
-                    }
-
                     LOG.trace("Accepting Connection...");
                     Socket acceptedSocket = null;
                     try {
@@ -241,8 +230,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
 
                                 commsSession.setTimeout((int) 
protocol.getRequestExpiration());
 
-                                LOG.info("Successfully negotiated 
ServerProtocol {} Version {} with {}", new Object[]{
-                                    protocol.getResourceName(), 
protocol.getVersionNegotiator().getVersion(), peer});
+                                LOG.info("Successfully negotiated 
ServerProtocol {} Version {} with {}", protocol.getResourceName(), 
protocol.getVersionNegotiator().getVersion(), peer);
 
                                 try {
                                     while (!protocol.isShutdown()) {
@@ -258,7 +246,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
                                                 // Give the timeout a bit 
longer (twice as long) to receive the Request Type,
                                                 // in order to attempt to 
receive more data without shutting down the socket if we don't
                                                 // have to.
-                                                LOG.debug("{} Timed out 
waiting to receive RequestType using {} with {}", new Object[]{this, protocol, 
peer});
+                                                LOG.debug("{} Timed out 
waiting to receive RequestType using {} with {}", this, protocol, peer);
                                                 timeoutCount++;
                                                 requestType = null;
 
@@ -377,7 +365,7 @@ public class SocketRemoteSiteListener implements 
RemoteSiteListener {
     public void destroy() {
     }
 
-    private void verifyMagicBytes(final InputStream in, final String 
peerDescription) throws IOException, HandshakeException {
+    private void verifyMagicBytes(final InputStream in, final String 
peerDescription) throws IOException {
         final byte[] receivedMagicBytes = new 
byte[CommunicationsSession.MAGIC_BYTES.length];
 
         // expect magic bytes

Reply via email to