This is an automated email from the ASF dual-hosted git repository. ferdei pushed a commit to branch NIFI-13450 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1a6a94b4144b590bbced42db3d660dad96ceaa92 Author: Ferenc Erdei <erdei.feren...@gmail.com> AuthorDate: Wed Jun 26 11:01:50 2024 +0200 NIFI-13450 Regenerate MiNiFi Agent Manifest between Heartbeats --- .../org/apache/nifi/c2/client/service/C2HeartbeatManager.java | 10 ++++++---- .../apache/nifi/c2/client/service/C2HeartbeatManagerTest.java | 9 +++++++++ .../java/org/apache/nifi/minifi/c2/C2NifiClientService.java | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java index a3729255ce..6840a84981 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatManager.java @@ -22,6 +22,8 @@ import static java.util.function.Predicate.not; import java.util.List; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + import org.apache.nifi.c2.client.api.C2Client; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.protocol.api.C2Heartbeat; @@ -36,15 +38,15 @@ public class C2HeartbeatManager implements Runnable { private final C2Client client; private final C2HeartbeatFactory c2HeartbeatFactory; private final ReentrantLock heartbeatLock; - private final RuntimeInfoWrapper runtimeInfoWrapper; + private final Supplier<RuntimeInfoWrapper> runtimeInfoSupplier; private final C2OperationManager c2OperationManager; - public C2HeartbeatManager(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, ReentrantLock heartbeatLock, RuntimeInfoWrapper runtimeInfoWrapper, + public C2HeartbeatManager(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, ReentrantLock heartbeatLock, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier, C2OperationManager c2OperationManager) { this.client = client; this.c2HeartbeatFactory = c2HeartbeatFactory; this.heartbeatLock = heartbeatLock; - this.runtimeInfoWrapper = runtimeInfoWrapper; + this.runtimeInfoSupplier = runtimeInfoSupplier; this.c2OperationManager = c2OperationManager; } @@ -56,7 +58,7 @@ public class C2HeartbeatManager implements Runnable { } try { LOGGER.debug("Heartbeat lock is acquired, sending heartbeat"); - C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); + C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoSupplier.get()); client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse); } catch (Exception e) { LOGGER.error("Failed to send/process heartbeat", e); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java index 2de1483487..46b0dc9f4e 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatManagerTest.java @@ -29,6 +29,8 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.Optional; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + import org.apache.nifi.c2.client.api.C2Client; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.protocol.api.C2Heartbeat; @@ -52,6 +54,9 @@ public class C2HeartbeatManagerTest { @Mock private ReentrantLock mockHeartbeatLock; + @Mock + private Supplier<RuntimeInfoWrapper> mockRuntimeInfoWrapperSupplier; + @Mock private RuntimeInfoWrapper mockRuntimeInfoWrapper; @@ -75,6 +80,7 @@ public class C2HeartbeatManagerTest { @Test void shouldSendHeartbeatAndProcessEmptyResponse() { + when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper); when(mockHeartbeatLock.tryLock()).thenReturn(true); C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class); when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat); @@ -91,6 +97,7 @@ public class C2HeartbeatManagerTest { @Test void shouldSendHeartbeatAndProcessResponseWithNoOperation() { + when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper); when(mockHeartbeatLock.tryLock()).thenReturn(true); C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class); when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat); @@ -108,6 +115,7 @@ public class C2HeartbeatManagerTest { @Test void shouldSendHeartbeatAndProcessResponseWithMultipleOperation() { + when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper); when(mockHeartbeatLock.tryLock()).thenReturn(true); C2Heartbeat mockC2Heartbeat = mock(C2Heartbeat.class); when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenReturn(mockC2Heartbeat); @@ -128,6 +136,7 @@ public class C2HeartbeatManagerTest { @Test void shouldReleaseHeartbeatLockWhenExceptionOccurs() { + when(mockRuntimeInfoWrapperSupplier.get()).thenReturn(mockRuntimeInfoWrapper); when(mockHeartbeatLock.tryLock()).thenReturn(true); when(mockC2HeartbeatFactory.create(mockRuntimeInfoWrapper)).thenThrow(new RuntimeException()); diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index 9b351e82f3..3528218520 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -174,7 +174,7 @@ public class C2NifiClientService { this.c2OperationManager = new C2OperationManager( client, c2OperationHandlerProvider, heartbeatLock, operationQueueDAO, c2OperationRestartHandler); this.c2HeartbeatManager = new C2HeartbeatManager( - client, heartbeatFactory, heartbeatLock, generateRuntimeInfo(), c2OperationManager); + client, heartbeatFactory, heartbeatLock, this::generateRuntimeInfo, c2OperationManager); } private C2ClientConfig generateClientConfig(NiFiProperties properties) {