This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c670161cb0 NIFI-12532: This closes #8179. Ensure that when 
CommunicateAction completes (exceptionally or otherwise) that it gets removed 
from the list of all CommunicationActions
c670161cb0 is described below

commit c670161cb09855b28e444a4d32d37427b31e8422
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Thu Dec 21 10:49:15 2023 -0500

    NIFI-12532: This closes #8179. Ensure that when CommunicateAction completes 
(exceptionally or otherwise) that it gets removed from the list of all 
CommunicationActions
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../server/ConnectionLoadBalanceServer.java        | 50 +++++++++++++---------
 1 file changed, 30 insertions(+), 20 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
index 666620b955..f5c7c644ca 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
@@ -17,6 +17,17 @@
 
 package org.apache.nifi.controller.queue.clustered.server;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.io.socket.SocketUtils;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.security.util.TlsPlatform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLServerSocket;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
@@ -31,17 +42,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLServerSocket;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.security.util.TlsPlatform;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ConnectionLoadBalanceServer {
     private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
@@ -104,10 +104,12 @@ public class ConnectionLoadBalanceServer {
             acceptConnection.stop();
         }
 
-        final Iterator<CommunicateAction> itr = 
communicationActions.iterator();
-        while (itr.hasNext()) {
-            itr.next().stop();
-            itr.remove();
+        synchronized (communicationActions) { // Must synchronize on 
Synchronized List when using iterator
+            final Iterator<CommunicateAction> itr = 
communicationActions.iterator();
+            while (itr.hasNext()) {
+                itr.next().stop();
+                itr.remove();
+            }
         }
     }
 
@@ -135,8 +137,7 @@ public class ConnectionLoadBalanceServer {
 
         private volatile boolean stopped = false;
 
-        // This should be final but it is not to allow override during 
testing; no production code modifies the value
-        private static int EXCEPTION_THRESHOLD_MILLIS = 10_000;
+        private static final int EXCEPTION_THRESHOLD_MILLIS = 10_000;
         private volatile long tlsErrorLastSeen = -1;
 
         public CommunicateAction(final LoadBalanceProtocol 
loadBalanceProtocol, final Socket socket, final EventReporter eventReporter) 
throws IOException {
@@ -187,6 +188,8 @@ public class ConnectionLoadBalanceServer {
                         logger.error("Failed to communicate over Channel {}", 
channelDescription, e);
                         eventReporter.reportEvent(Severity.ERROR, "Load 
Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " 
+ e);
                     }
+
+                    return;
                 }
             }
         }
@@ -265,11 +268,18 @@ public class ConnectionLoadBalanceServer {
                     socket.setSoTimeout(connectionTimeoutMillis);
 
                     final CommunicateAction communicateAction = new 
CommunicateAction(loadBalanceProtocol, socket, eventReporter);
-                    final Thread commsThread = new Thread(communicateAction);
+                    communicationActions.add(communicateAction);
+
+                    final Thread commsThread = new Thread(() -> {
+                        try {
+                            communicateAction.run();
+                        } finally {
+                            communicationActions.remove(communicateAction);
+                        }
+                    });
+
                     commsThread.setName("Load-Balance Server Thread-" + 
threadCounter.getAndIncrement());
                     commsThread.start();
-
-                    communicationActions.add(communicateAction);
                 } catch (final Exception e) {
                     logger.error("{} Failed to accept connection from other 
node in cluster", ConnectionLoadBalanceServer.this, e);
                 }

Reply via email to