This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 34bf8af191c NIFI-15655 Fixed handling of missing port in Cluster
Leader Address
34bf8af191c is described below
commit 34bf8af191c30056465bd7c47f1be964397eafb0
Author: Pierre Villard <[email protected]>
AuthorDate: Sun Mar 1 13:47:40 2026 +0100
NIFI-15655 Fixed handling of missing port in Cluster Leader Address
- Checked for empty Participant identifier from ZooKeeper Leader Election
Manager
- Checked for missing port before parsing Heartbeat Address
- Improved stability of
FlowSynchronizationIT.testUnnecessaryProcessorsAndConnectionsRemoved with
better exception handling
This closes #10946
Signed-off-by: David Handermann <[email protected]>
---
.../leader/zookeeper/CuratorLeaderElectionManager.java | 16 +++++++++++++++-
.../cluster/protocol/AbstractNodeProtocolSender.java | 11 ++++++++++-
.../tests/system/clustering/FlowSynchronizationIT.java | 12 +++++++++---
3 files changed, 34 insertions(+), 5 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
index c4c488be694..f19af5808e5 100644
---
a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
+++
b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-zookeeper-bundle/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java
@@ -317,7 +317,21 @@ public class CuratorLeaderElectionManager extends
TrackedLeaderElectionManager {
try {
final Participant leader = selector.getLeader();
- return leader == null ? Optional.empty() :
Optional.of(leader.getId());
+
+ final String leaderId;
+ if (leader == null) {
+ leaderId = null;
+ } else {
+ final String participantId = leader.getId();
+ if (participantId == null || participantId.isEmpty()) {
+ logger.info("Leader ID for Role [{}] not found with
external lookup", roleName);
+ leaderId = null;
+ } else {
+ leaderId = participantId;
+ }
+ }
+
+ return Optional.ofNullable(leaderId);
} catch (final KeeperException.NoNodeException nne) {
// If there is no ZNode, then there is no elected leader.
return Optional.empty();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index 97c2ba90127..bf9127be2e6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -126,8 +126,17 @@ public abstract class AbstractNodeProtocolSender
implements NodeProtocolSender {
final CommsTimingDetails timingDetails = new CommsTimingDetails();
final String[] parts = address.split(":");
+ if (parts.length != 2) {
+ throw new ProtocolException("Heartbeat address [%s] missing port
number separator".formatted(address));
+ }
+
final String hostname = parts[0];
- final int port = Integer.parseInt(parts[1]);
+ final int port;
+ try {
+ port = Integer.parseInt(parts[1]);
+ } catch (final NumberFormatException e) {
+ throw new ProtocolException("Heartbeat address [%s] missing port
number".formatted(address), e);
+ }
final ProtocolMessage responseMessage = sendProtocolMessage(msg,
hostname, port, timingDetails);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 3bd17a94464..acf3b1c9dad 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -603,10 +603,10 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
getClientUtil().waitForStoppedProcessor(countFlowFiles.getId());
getNifiClient().getConnectionClient().deleteConnection(connection);
getNifiClient().getProcessorClient().deleteProcessor(countFlowFiles);
- // Disable the specific controller services in dependency order before
deletion
getClientUtil().disableControllerService(countA);
getClientUtil().disableControllerService(countB);
getClientUtil().disableControllerService(countC);
+
getClientUtil().waitForControllerServicesDisabled(countC.getParentGroupId(),
countA.getId(), countB.getId(), countC.getId());
getNifiClient().getControllerServicesClient().deleteControllerService(countC);
getNifiClient().getControllerServicesClient().deleteControllerService(countB);
@@ -707,6 +707,9 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
waitForQueueCount(connection.getId(), 0);
+ getClientUtil().stopProcessor(generate);
+ getClientUtil().stopProcessor(terminate);
+
// Reconnect the node to the cluster
switchClientToNode(1);
reconnectNode(2);
@@ -719,11 +722,14 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
final ProcessGroupFlowEntity flow =
getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup("root");
final FlowDTO flowDto = flow.getProcessGroupFlow().getFlow();
- if (flowDto.getProcessors().size() != 1) {
+ final int processorCount = flowDto.getProcessors().size();
+ final int connectionCount = flowDto.getConnections().size();
+ if (processorCount != 1 || connectionCount != 0) {
+ logger.info("Waiting for Node 2 to have 1 processor and 0
connections but found {} processors and {} connections", processorCount,
connectionCount);
return false;
}
- return flowDto.getConnections().isEmpty();
+ return true;
});
}