Marc Parisi created NIFI-6781:
---------------------------------

             Summary: ThreadPoolRequestReplicator
                 Key: NIFI-6781
                 URL: https://issues.apache.org/jira/browse/NIFI-6781
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Core Framework
            Reporter: Marc Parisi
            Assignee: Marc Parisi


I've noticed that replication is attempted locally. I tested a simple change to 
eliminate the local node; however, I suspect this is not a big deal or I've 
missed something or the cluster states does not include the local identifier. 
All tests allow for local instances with different ports, implying that pruning 
is potentially unnecessary or incorrect logic. Therefore I've created this as a 
an "Improvement" as I dive further into the code to validate my change. If 
anyone has the immediate answer regarding this code I'm happy to close this as 
OBE.

 
{code:java}
 --- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             }
         }
 
-        final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED);
+        // get nodes that do not match this node.
+        final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
+            return clusterCoordinator.getLocalNodeIdentifier() == null || x != 
clusterCoordinator.getLocalNodeIdentifier();
+        }).collect(Collectors.toList());
+
         if (nodeIds == null || nodeIds.isEmpty()) {
             throw new NoConnectedNodesException();
         }
 
+        logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to