This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 34bf8af191c NIFI-15655 Fixed handling of missing port in Cluster 
Leader Address
34bf8af191c is described below

commit 34bf8af191c30056465bd7c47f1be964397eafb0
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Mar 1 13:47:40 2026 +0100

    NIFI-15655 Fixed handling of missing port in Cluster Leader Address
    
    - Checked for empty Participant identifier from ZooKeeper Leader Election 
Manager
    - Checked for missing port before parsing Heartbeat Address
    - Improved stability of 
FlowSynchronizationIT.testUnnecessaryProcessorsAndConnectionsRemoved with 
better exception handling
    
    This closes #10946
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../leader/zookeeper/CuratorLeaderElectionManager.java   | 16 +++++++++++++++-
 .../cluster/protocol/AbstractNodeProtocolSender.java     | 11 ++++++++++-
 .../tests/system/clustering/FlowSynchronizationIT.java   | 12 +++++++++---
 3 files changed, 34 insertions(+), 5 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
index c4c488be694..f19af5808e5 100644
--- 
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
+++ 
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
@@ -317,7 +317,21 @@ public class CuratorLeaderElectionManager extends 
TrackedLeaderElectionManager {
 
             try {
                 final Participant leader = selector.getLeader();
-                return leader == null ? Optional.empty() : 
Optional.of(leader.getId());
+
+                final String leaderId;
+                if (leader == null) {
+                    leaderId = null;
+                } else {
+                    final String participantId = leader.getId();
+                    if (participantId == null || participantId.isEmpty()) {
+                        logger.info("Leader ID for Role [{}] not found with 
external lookup", roleName);
+                        leaderId = null;
+                    } else {
+                        leaderId = participantId;
+                    }
+                }
+
+                return Optional.ofNullable(leaderId);
             } catch (final KeeperException.NoNodeException nne) {
                 // If there is no ZNode, then there is no elected leader.
                 return Optional.empty();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 97c2ba90127..bf9127be2e6 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -126,8 +126,17 @@ public abstract class AbstractNodeProtocolSender 
implements NodeProtocolSender {
         final CommsTimingDetails timingDetails = new CommsTimingDetails();
 
         final String[] parts = address.split(":");
+        if (parts.length != 2) {
+            throw new ProtocolException("Heartbeat address [%s] missing port 
number separator".formatted(address));
+        }
+
         final String hostname = parts[0];
-        final int port = Integer.parseInt(parts[1]);
+        final int port;
+        try {
+            port = Integer.parseInt(parts[1]);
+        } catch (final NumberFormatException e) {
+            throw new ProtocolException("Heartbeat address [%s] missing port 
number".formatted(address), e);
+        }
 
         final ProtocolMessage responseMessage = sendProtocolMessage(msg, 
hostname, port, timingDetails);
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 3bd17a94464..acf3b1c9dad 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -603,10 +603,10 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         getClientUtil().waitForStoppedProcessor(countFlowFiles.getId());
         getNifiClient().getConnectionClient().deleteConnection(connection);
         getNifiClient().getProcessorClient().deleteProcessor(countFlowFiles);
-        // Disable the specific controller services in dependency order before 
deletion
         getClientUtil().disableControllerService(countA);
         getClientUtil().disableControllerService(countB);
         getClientUtil().disableControllerService(countC);
+        
getClientUtil().waitForControllerServicesDisabled(countC.getParentGroupId(), 
countA.getId(), countB.getId(), countC.getId());
         
getNifiClient().getControllerServicesClient().deleteControllerService(countC);
         
getNifiClient().getControllerServicesClient().deleteControllerService(countB);
 
@@ -707,6 +707,9 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
 
         waitForQueueCount(connection.getId(), 0);
 
+        getClientUtil().stopProcessor(generate);
+        getClientUtil().stopProcessor(terminate);
+
         // Reconnect the node to the cluster
         switchClientToNode(1);
         reconnectNode(2);
@@ -719,11 +722,14 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
             final ProcessGroupFlowEntity flow = 
getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup("root");
             final FlowDTO flowDto = flow.getProcessGroupFlow().getFlow();
 
-            if (flowDto.getProcessors().size() != 1) {
+            final int processorCount = flowDto.getProcessors().size();
+            final int connectionCount = flowDto.getConnections().size();
+            if (processorCount != 1 || connectionCount != 0) {
+                logger.info("Waiting for Node 2 to have 1 processor and 0 
connections but found {} processors and {} connections", processorCount, 
connectionCount);
                 return false;
             }
 
-            return flowDto.getConnections().isEmpty();
+            return true;
         });
     }
 

Reply via email to