This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 99cf87c  NIFI-6736: Create thread on demand to handle incoming request 
from client for load balancing. This allows us to avoid situations where we 
don't have enough threads and we block on the server side, waiting for data, 
when clients are trying to send data in another connection
99cf87c is described below

commit 99cf87c330a2b27757cb188a4e806a46c31ecd1b
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Wed Oct 2 09:35:46 2019 -0400

    NIFI-6736: Create thread on demand to handle incoming request from client 
for load balancing. This allows us to avoid situations where we don't have 
enough threads and we block on the server side, waiting for data, when clients 
are trying to send data in another connection
    
    This closes #3784.
    
    Signed-off-by: Bryan Bende <bbe...@apache.org>
---
 .../controller/queue/SwappablePriorityQueue.java   | 27 +++++++
 .../async/nio/NioAsyncLoadBalanceClient.java       | 45 ++++++-----
 .../clustered/partition/RemoteQueuePartition.java  |  9 ++-
 .../server/ConnectionLoadBalanceServer.java        | 90 ++++++++--------------
 .../clustered/server/LoadBalanceProtocol.java      |  8 +-
 .../server/StandardLoadBalanceProtocol.java        |  7 +-
 6 files changed, 99 insertions(+), 87 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index 8613130..b81bd3f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -410,6 +410,33 @@ public class SwappablePriorityQueue {
         return getFlowFileQueueSize().isEmpty();
     }
 
+    public boolean isFlowFileAvailable() {
+        if (isEmpty()) {
+            return false;
+        }
+
+        readLock.lock();
+        try {
+            // If we have data in the active or swap queue that is penalized, 
then we know that all FlowFiles
+            // are penalized. As a result, we can say that no FlowFile is 
available.
+            FlowFileRecord firstRecord = activeQueue.peek();
+            if (firstRecord == null && !swapQueue.isEmpty()) {
+                firstRecord = swapQueue.get(0);
+            }
+
+            if (firstRecord == null) {
+                // If the queue is not empty, then all data is swapped out. We 
don't actually know whether or not the swapped out data is penalized, so we 
assume
+                // that it is not penalized and is therefore available.
+                return !isEmpty();
+            }
+
+            // We do have a FlowFile that was retrieved from the active or 
swap queue. It is available if it is not penalized.
+            return !firstRecord.isPenalized();
+        } finally {
+            readLock.unlock("isFlowFileAvailable");
+        }
+    }
+
     public boolean isActiveQueueEmpty() {
         final FlowFileQueueSize queueSize = getFlowFileQueueSize();
         return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() 
== 0;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
index 855db8e..1257a3c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.java
@@ -385,38 +385,37 @@ public class NioAsyncLoadBalanceClient implements 
AsyncLoadBalanceClient {
         return selector != null && channel != null && channel.isConnected();
     }
 
-    private synchronized void establishConnection() throws IOException {
+    private void establishConnection() throws IOException {
         SocketChannel socketChannel = null;
 
         try {
-            selector = Selector.open();
-            socketChannel = createChannel();
+            final PeerChannel peerChannel;
+            synchronized (this) {
+                if (isConnectionEstablished()) {
+                    return;
+                }
 
-            socketChannel.configureBlocking(true);
+                selector = Selector.open();
+                socketChannel = createChannel();
+                socketChannel.configureBlocking(true);
 
-            channel = createPeerChannel(socketChannel, 
nodeIdentifier.toString());
-            channel.performHandshake();
+                peerChannel = createPeerChannel(socketChannel, 
nodeIdentifier.toString());
+                channel = peerChannel;
+            }
 
-            socketChannel.configureBlocking(false);
-            selectionKey = socketChannel.register(selector, 
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-        } catch (Exception e) {
-            logger.error("Unable to connect to {} for load balancing", 
nodeIdentifier, e);
+            // Perform handshake outside of the synchronized block. We do this 
because if the server-side is not very responsive,
+            // the handshake may take some time. We don't want to block any 
other threads from interacting with this Client in
+            // the meantime, especially web threads that may be calling 
#register or #unregister.
+            peerChannel.performHandshake();
 
-            if (selector != null) {
-                try {
-                    selector.close();
-                } catch (final Exception e1) {
-                    e.addSuppressed(e1);
-                }
+            synchronized (this) {
+                socketChannel.configureBlocking(false);
+                selectionKey = socketChannel.register(selector, 
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
             }
+        } catch (Exception e) {
+            logger.error("Unable to connect to {} for load balancing", 
nodeIdentifier, e);
 
-            if (channel != null) {
-                try {
-                    channel.close();
-                } catch (final Exception e1) {
-                    e.addSuppressed(e1);
-                }
-            }
+            close();
 
             if (socketChannel != null) {
                 try {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 106ab26..144a043 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 /**
@@ -214,7 +215,13 @@ public class RemoteQueuePartition implements 
QueuePartition {
             }
         };
 
-        clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, 
priorityQueue::isEmpty, this::getFlowFile,
+        // Consider the queue empty unless a FlowFile is available. This means 
that if the queue has only penalized FlowFiles, it will be considered empty.
+        // This is what we want for the purpose of load balancing the data. 
Otherwise, we would have a situation where we create a connection to the other 
node,
+        // determine that now FlowFile is available to send, and then notify 
the node of this and close the connection. And then this would repeat over and 
over
+        // until the FlowFile is no longer penalized. Instead, we want to 
consider the queue empty until a FlowFile is actually available, and only then 
bother
+        // creating the connection to send data.
+        final BooleanSupplier emptySupplier = () -> 
!priorityQueue.isFlowFileAvailable();
+        clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, 
emptySupplier, this::getFlowFile,
             failureCallback, successCallback, 
flowFileQueue::getLoadBalanceCompression, 
flowFileQueue::isPropagateBackpressureAcrossNodes);
 
         running = true;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
index 93fc2d7..49094c5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.controller.queue.clustered.server;
 
-import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
@@ -25,34 +24,33 @@ import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocket;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class ConnectionLoadBalanceServer {
     private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
+    private static final AtomicLong threadCounter = new AtomicLong(1L);
 
     private final String hostname;
     private final int port;
     private final SSLContext sslContext;
-    private final ExecutorService threadPool;
     private final LoadBalanceProtocol loadBalanceProtocol;
     private final int connectionTimeoutMillis;
-    private final int numThreads;
     private final EventReporter eventReporter;
 
-    private volatile Set<CommunicateAction> communicationActions = 
Collections.emptySet();
-    private final BlockingQueue<Socket> connectionQueue = new 
LinkedBlockingQueue<>();
+    private final List<CommunicateAction> communicationActions = 
Collections.synchronizedList(new ArrayList<>());
 
     private volatile AcceptConnection acceptConnection;
     private volatile ServerSocket serverSocket;
@@ -65,10 +63,7 @@ public class ConnectionLoadBalanceServer {
         this.sslContext = sslContext;
         this.loadBalanceProtocol = loadBalanceProtocol;
         this.connectionTimeoutMillis = connectionTimeoutMillis;
-        this.numThreads = numThreads;
         this.eventReporter = eventReporter;
-
-        threadPool = new FlowEngine(numThreads, "Load Balance Server");
     }
 
     public void start() throws IOException {
@@ -88,15 +83,6 @@ public class ConnectionLoadBalanceServer {
                     "'nifi.cluster.load.balance.port' and 
'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' 
properties", e);
         }
 
-        final Set<CommunicateAction> actions = new HashSet<>(numThreads);
-        for (int i=0; i < numThreads; i++) {
-            final CommunicateAction action = new 
CommunicateAction(loadBalanceProtocol);
-            actions.add(action);
-            threadPool.submit(action);
-        }
-
-        this.communicationActions = actions;
-
         acceptConnection = new AcceptConnection(serverSocket);
         final Thread receiveConnectionThread = new Thread(acceptConnection);
         receiveConnectionThread.setName("Receive Queue Load-Balancing 
Connections");
@@ -109,22 +95,15 @@ public class ConnectionLoadBalanceServer {
 
     public void stop() {
         stopped = false;
-        threadPool.shutdown();
 
         if (acceptConnection != null) {
             acceptConnection.stop();
         }
 
-        communicationActions.forEach(CommunicateAction::stop);
-
-        Socket socket;
-        while ((socket = connectionQueue.poll()) != null) {
-            try {
-                socket.close();
-                logger.info("{} Closed connection to {} on Server stop", this, 
socket.getRemoteSocketAddress());
-            } catch (final IOException ioe) {
-                logger.warn("Failed to properly close socket to " + 
socket.getRemoteSocketAddress(), ioe);
-            }
+        final Iterator<CommunicateAction> itr = 
communicationActions.iterator();
+        while (itr.hasNext()) {
+            itr.next().stop();
+            itr.remove();
         }
     }
 
@@ -143,10 +122,18 @@ public class ConnectionLoadBalanceServer {
 
     private class CommunicateAction implements Runnable {
         private final LoadBalanceProtocol loadBalanceProtocol;
+        private final Socket socket;
+        private final InputStream in;
+        private final OutputStream out;
+
         private volatile boolean stopped = false;
 
-        public CommunicateAction(final LoadBalanceProtocol 
loadBalanceProtocol) {
+        public CommunicateAction(final LoadBalanceProtocol 
loadBalanceProtocol, final Socket socket) throws IOException {
             this.loadBalanceProtocol = loadBalanceProtocol;
+            this.socket = socket;
+
+            this.in = new BufferedInputStream(socket.getInputStream());
+            this.out = new BufferedOutputStream(socket.getOutputStream());
         }
 
         public void stop() {
@@ -158,28 +145,15 @@ public class ConnectionLoadBalanceServer {
             String peerDescription = "<Unknown Client>";
 
             while (!stopped) {
-                Socket socket = null;
                 try {
-                    socket = connectionQueue.poll(1, TimeUnit.SECONDS);
-                    if (socket == null) {
-                        continue;
-                    }
-
                     peerDescription = 
socket.getRemoteSocketAddress().toString();
 
-                    if (socket.isClosed()) {
-                        logger.debug("Connection to Peer {} is closed. Will 
not attempt to communicate over this Socket.", peerDescription);
-                        continue;
-                    }
-
                     logger.debug("Receiving FlowFiles from Peer {}", 
peerDescription);
-                    loadBalanceProtocol.receiveFlowFiles(socket);
+                    loadBalanceProtocol.receiveFlowFiles(socket, in, out);
 
-                    if (socket.isConnected()) {
-                        logger.debug("Finished receiving FlowFiles from Peer 
{}. Will recycle connection.", peerDescription);
-                        connectionQueue.offer(socket);
-                    } else {
-                        logger.debug("Finished receiving FlowFiles from Peer 
{}. Socket is no longer connected so will not recycle connection.", 
peerDescription);
+                    if (socket.isClosed()) {
+                        logger.debug("Finished Receiving FlowFiles from Peer 
{}", peerDescription);
+                        break;
                     }
                 } catch (final Exception e) {
                     if (socket != null) {
@@ -194,8 +168,6 @@ public class ConnectionLoadBalanceServer {
                     eventReporter.reportEvent(Severity.ERROR, "Load Balanced 
Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
                 }
             }
-
-            logger.info("Connection Load Balance Server shutdown. Will no 
longer handle incoming requests.");
         }
     }
 
@@ -230,7 +202,13 @@ public class ConnectionLoadBalanceServer {
                     }
 
                     socket.setSoTimeout(connectionTimeoutMillis);
-                    connectionQueue.offer(socket);
+
+                    final CommunicateAction communicateAction = new 
CommunicateAction(loadBalanceProtocol, socket);
+                    final Thread commsThread = new Thread(communicateAction);
+                    commsThread.setName("Load-Balance Server Thread-" + 
threadCounter.getAndIncrement());
+                    commsThread.start();
+
+                    communicationActions.add(communicateAction);
                 } catch (final Exception e) {
                     logger.error("{} Failed to accept connection from other 
node in cluster", ConnectionLoadBalanceServer.this, e);
                 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
index 5a74ebc..d5d0778 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.controller.queue.clustered.server;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.Socket;
 
 public interface LoadBalanceProtocol {
@@ -26,10 +28,14 @@ public interface LoadBalanceProtocol {
      * Receives FlowFiles from the peer attached to the socket
      *
      * @param socket the socket to read from and write to
+     * @param in the InputStream to read from. The Socket's InputStream is 
wrapped with a BufferedInputStream, which is provided
+     * here. If this method were to wrap the InputStream itself, a second call 
to the method may discard some data that was consumed
+     * by the previous call's BufferedInputStream
+     * @param out the OutputStream to write to
      *
      * @throws TransactionAbortedException if the transaction was aborted
      * @throws IOException if unable to communicate with the peer
      */
-    void receiveFlowFiles(Socket socket) throws IOException;
+    void receiveFlowFiles(Socket socket, InputStream in, OutputStream out) 
throws IOException;
 
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 2168f3e..d9fdd2a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -46,8 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.SSLSocket;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -112,10 +110,7 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
 
 
     @Override
-    public void receiveFlowFiles(final Socket socket) throws IOException {
-        final InputStream in = new 
BufferedInputStream(socket.getInputStream());
-        final OutputStream out = new 
BufferedOutputStream(socket.getOutputStream());
-
+    public void receiveFlowFiles(final Socket socket, final InputStream in, 
final OutputStream out) throws IOException {
         String peerDescription = socket.getInetAddress().getHostName();
         if (socket instanceof SSLSocket) {
             logger.debug("Connection received from peer {}", peerDescription);

Reply via email to