This is an automated email from the ASF dual-hosted git repository. ferdei pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new e26aa233d7 NIFI-10458 Add support for the DESCRIBE/MANIFEST Operation e26aa233d7 is described below commit e26aa233d71584071421142021d1d0d33204da1d Author: Csaba Bejan <bejan.cs...@gmail.com> AuthorDate: Thu Sep 8 14:42:16 2022 +0200 NIFI-10458 Add support for the DESCRIBE/MANIFEST Operation Signed-off-by: Ferenc Erdei <erdei.feren...@gmail.com> This closes #6375 --- .../org/apache/nifi/c2/client/C2ClientConfig.java | 12 ++++ .../nifi/c2/client/service/C2HeartbeatFactory.java | 34 ++++++++- .../service/operation/C2OperationService.java | 9 ++- .../DescribeManifestOperationHandler.java | 81 +++++++++++++++++++++ .../c2/client/service/C2HeartbeatFactoryTest.java | 76 ++++++++++++++++++-- .../DescribeManifestOperationHandlerTest.java | 84 ++++++++++++++++++++++ .../org/apache/nifi/c2/protocol/api/AgentInfo.java | 10 +++ .../src/main/resources/conf/bootstrap.conf | 2 + .../java/org/apache/nifi/c2/C2NiFiProperties.java | 1 + .../org/apache/nifi/c2/C2NifiClientService.java | 10 ++- 10 files changed, 311 insertions(+), 8 deletions(-) diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java index 5d7481dd9d..ecf677c399 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java @@ -25,6 +25,7 @@ public class C2ClientConfig { private final String c2AckUrl; private final String agentClass; private final String agentIdentifier; + private final boolean fullHeartbeat; private final String confDirectory; private final String runtimeManifestIdentifier; private final String runtimeType; @@ -46,6 +47,7 @@ public class C2ClientConfig { this.c2AckUrl = builder.c2AckUrl; this.agentClass = builder.agentClass; this.agentIdentifier = builder.agentIdentifier; + this.fullHeartbeat = builder.fullHeartbeat; this.confDirectory = builder.confDirectory; this.runtimeManifestIdentifier = builder.runtimeManifestIdentifier; this.runtimeType = builder.runtimeType; @@ -78,6 +80,10 @@ public class C2ClientConfig { return agentIdentifier; } + public boolean isFullHeartbeat() { + return fullHeartbeat; + } + public String getConfDirectory() { return confDirectory; } @@ -143,6 +149,7 @@ public class C2ClientConfig { private String c2AckUrl; private String agentClass; private String agentIdentifier; + private boolean fullHeartbeat; private String confDirectory; private String runtimeManifestIdentifier; private String runtimeType; @@ -178,6 +185,11 @@ public class C2ClientConfig { return this; } + public Builder fullHeartbeat(final boolean fullHeartbeat) { + this.fullHeartbeat = fullHeartbeat; + return this; + } + public Builder confDirectory(final String confDirectory) { this.confDirectory = confDirectory; return this; diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java index bac10e2d0b..063017ea5a 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2HeartbeatFactory.java @@ -23,8 +23,12 @@ import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.net.InetAddress; import java.net.NetworkInterface; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Enumeration; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -41,6 +45,7 @@ import org.apache.nifi.c2.protocol.api.FlowInfo; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; import org.apache.nifi.c2.protocol.api.NetworkInfo; import org.apache.nifi.c2.protocol.api.SystemInfo; +import org.apache.nifi.c2.protocol.component.api.Bundle; import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +97,11 @@ public class C2HeartbeatFactory { agentStatus.setRepositories(repos); agentInfo.setStatus(agentStatus); - agentInfo.setAgentManifest(manifest); + agentInfo.setAgentManifestHash(calculateManifestHash(manifest.getBundles())); + + if (clientConfig.isFullHeartbeat()) { + agentInfo.setAgentManifest(manifest); + } return agentInfo; } @@ -225,4 +234,27 @@ public class C2HeartbeatFactory { return confDirectory; } + + private String calculateManifestHash(List<Bundle> loadedBundles) { + byte[] bytes; + try { + bytes = MessageDigest.getInstance("SHA-512").digest(loadedBundles.stream() + .map(bundle -> bundle.getGroup() + bundle.getArtifact() + bundle.getVersion()) + .sorted() + .collect(Collectors.joining(",")) + .getBytes(StandardCharsets.UTF_8)); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("Unable to set up manifest hash calculation due to not having support for the chosen digest algorithm", e); + } + + return bytesToHex(bytes); + } + + private String bytesToHex(byte[] in) { + final StringBuilder builder = new StringBuilder(); + for (byte b : in) { + builder.append(String.format("%02x", b)); + } + return builder.toString(); + } } diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java index fadc8ad79b..5ce9c51ef4 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java @@ -24,9 +24,13 @@ import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.c2.protocol.api.OperandType; import org.apache.nifi.c2.protocol.api.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class C2OperationService { + private static final Logger logger = LoggerFactory.getLogger(C2OperationService.class); + private final Map<OperationType, Map<OperandType, C2OperationHandler>> handlerMap = new HashMap<>(); public C2OperationService(List<C2OperationHandler> handlers) { @@ -37,7 +41,10 @@ public class C2OperationService { public Optional<C2OperationAck> handleOperation(C2Operation operation) { return getHandlerForOperation(operation) - .map(handler -> handler.handle(operation)); + .map(handler -> { + logger.info("Handling {} {} operation", operation.getOperation(), operation.getOperand()); + return handler.handle(operation); + }); } private Optional<C2OperationHandler> getHandlerForOperation(C2Operation operation) { diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java new file mode 100644 index 0000000000..2f55b2510d --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandler.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2.client.service.operation; + +import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST; +import static org.apache.nifi.c2.protocol.api.OperationType.DESCRIBE; + +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.nifi.c2.client.service.C2HeartbeatFactory; +import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; +import org.apache.nifi.c2.protocol.api.AgentInfo; +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DescribeManifestOperationHandler implements C2OperationHandler { + + private static final Logger logger = LoggerFactory.getLogger(DescribeManifestOperationHandler.class); + + private final C2HeartbeatFactory heartbeatFactory; + private final Supplier<RuntimeInfoWrapper> runtimeInfoSupplier; + + public DescribeManifestOperationHandler(C2HeartbeatFactory heartbeatFactory, Supplier<RuntimeInfoWrapper> runtimeInfoSupplier) { + this.heartbeatFactory = heartbeatFactory; + this.runtimeInfoSupplier = runtimeInfoSupplier; + } + + @Override + public OperationType getOperationType() { + return DESCRIBE; + } + + @Override + public OperandType getOperandType() { + return MANIFEST; + } + + @Override + public C2OperationAck handle(C2Operation operation) { + String opIdentifier = Optional.ofNullable(operation.getIdentifier()) + .orElse(EMPTY); + C2OperationAck operationAck = new C2OperationAck(); + C2OperationState state = new C2OperationState(); + operationAck.setOperationState(state); + operationAck.setOperationId(opIdentifier); + + RuntimeInfoWrapper runtimeInfoWrapper = runtimeInfoSupplier.get(); + C2Heartbeat heartbeat = heartbeatFactory.create(runtimeInfoWrapper); + + AgentInfo agentInfo = heartbeat.getAgentInfo(); + agentInfo.setAgentManifest(runtimeInfoWrapper.getManifest()); + operationAck.setAgentInfo(agentInfo); + operationAck.setDeviceInfo(heartbeat.getDeviceInfo()); + operationAck.setFlowInfo(heartbeat.getFlowInfo()); + + state.setState(C2OperationState.OperationState.FULLY_APPLIED); + + return operationAck; + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java index 111d750db5..c1c8efb86d 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java @@ -17,12 +17,15 @@ package org.apache.nifi.c2.client.service; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.nifi.c2.client.C2ClientConfig; @@ -30,6 +33,7 @@ import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.FlowQueueStatus; +import org.apache.nifi.c2.protocol.component.api.Bundle; import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,6 +55,9 @@ public class C2HeartbeatFactoryTest { @Mock private FlowIdHolder flowIdHolder; + @Mock + private RuntimeInfoWrapper runtimeInfoWrapper; + @InjectMocks private C2HeartbeatFactory c2HeartbeatFactory; @@ -66,8 +73,9 @@ public class C2HeartbeatFactoryTest { void testCreateHeartbeat() { when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); when(clientConfig.getAgentClass()).thenReturn(AGENT_CLASS); + when(runtimeInfoWrapper.getManifest()).thenReturn(createManifest()); - C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class)); + C2Heartbeat heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); assertEquals(FLOW_ID, heartbeat.getFlowId()); assertEquals(AGENT_CLASS, heartbeat.getAgentClass()); @@ -75,16 +83,20 @@ public class C2HeartbeatFactoryTest { @Test void testCreateGeneratesAgentAndDeviceIdIfNotPresent() { - C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class)); + when(runtimeInfoWrapper.getManifest()).thenReturn(createManifest()); + + C2Heartbeat heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper); assertNotNull(heartbeat.getAgentId()); assertNotNull(heartbeat.getDeviceId()); } @Test - void testCreatePopulatesFromRuntimeInfoWrapper() { + void testCreatePopulatesFromRuntimeInfoWrapperForFullHeartbeat() { + when(clientConfig.isFullHeartbeat()).thenReturn(true); + AgentRepositories repos = new AgentRepositories(); - RuntimeManifest manifest = new RuntimeManifest(); + RuntimeManifest manifest = createManifest(); Map<String, FlowQueueStatus> queueStatus = new HashMap<>(); C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus)); @@ -94,10 +106,66 @@ public class C2HeartbeatFactoryTest { assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); } + @Test + void testCreatePopulatesFromRuntimeInfoWrapperForLightHeartbeat() { + when(clientConfig.isFullHeartbeat()).thenReturn(false); + + AgentRepositories repos = new AgentRepositories(); + RuntimeManifest manifest = createManifest(); + Map<String, FlowQueueStatus> queueStatus = new HashMap<>(); + + C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus)); + + assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories()); + assertNull(heartbeat.getAgentInfo().getAgentManifest()); + assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues()); + } + @Test void testCreateThrowsExceptionWhenConfDirNotSet() { when(clientConfig.getConfDirectory()).thenReturn(String.class.getSimpleName()); assertThrows(IllegalStateException.class, () -> c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class))); } + + @Test + void testManifestHashChangesWhenManifestBundleChanges() { + Bundle bundle1 = new Bundle("group1", "artifact1", "version1"); + Bundle bundle2 = new Bundle("group2", "artifact2", "version2"); + RuntimeManifest manifest1 = createManifest(bundle1); + RuntimeManifest manifest2 = createManifest(bundle2); + RuntimeManifest manifest3 = createManifest(bundle1, bundle2); + + when(runtimeInfoWrapper.getManifest()).thenReturn(manifest1); + C2Heartbeat heartbeat1 = c2HeartbeatFactory.create(runtimeInfoWrapper); + String hash1 = heartbeat1.getAgentInfo().getAgentManifestHash(); + assertNotNull(hash1); + + // same manifest should result in the same hash + assertEquals(hash1, c2HeartbeatFactory.create(runtimeInfoWrapper).getAgentInfo().getAgentManifestHash()); + + // different manifest should result in hash change + when(runtimeInfoWrapper.getManifest()).thenReturn(manifest2); + C2Heartbeat heartbeat2 = c2HeartbeatFactory.create(runtimeInfoWrapper); + String hash2 = heartbeat2.getAgentInfo().getAgentManifestHash(); + assertNotEquals(hash2, hash1); + + // different manifest with multiple bundles should result in hash change compared to all previous + when(runtimeInfoWrapper.getManifest()).thenReturn(manifest3); + C2Heartbeat heartbeat3 = c2HeartbeatFactory.create(runtimeInfoWrapper); + String hash3 = heartbeat3.getAgentInfo().getAgentManifestHash(); + assertNotEquals(hash3, hash1); + assertNotEquals(hash3, hash2); + } + + private RuntimeManifest createManifest() { + return createManifest(new Bundle()); + } + + private RuntimeManifest createManifest(Bundle... bundles) { + RuntimeManifest manifest = new RuntimeManifest(); + manifest.setBundles(Arrays.asList(bundles)); + + return manifest; + } } diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java new file mode 100644 index 0000000000..2017cc18a5 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/DescribeManifestOperationHandlerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2.client.service.operation; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import org.apache.nifi.c2.client.service.C2HeartbeatFactory; +import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; +import org.apache.nifi.c2.protocol.api.AgentInfo; +import org.apache.nifi.c2.protocol.api.C2Heartbeat; +import org.apache.nifi.c2.protocol.api.C2Operation; +import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.apache.nifi.c2.protocol.api.C2OperationState; +import org.apache.nifi.c2.protocol.api.DeviceInfo; +import org.apache.nifi.c2.protocol.api.FlowInfo; +import org.apache.nifi.c2.protocol.api.OperandType; +import org.apache.nifi.c2.protocol.api.OperationType; +import org.apache.nifi.c2.protocol.component.api.RuntimeManifest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class DescribeManifestOperationHandlerTest { + + private static final String OPERATION_ID = "operationId"; + + @Mock + private C2HeartbeatFactory heartbeatFactory; + + @Test + void testDescribeManifestOperationHandlerCreateSuccess() { + DescribeManifestOperationHandler handler = new DescribeManifestOperationHandler(null, null); + + assertEquals(OperationType.DESCRIBE, handler.getOperationType()); + assertEquals(OperandType.MANIFEST, handler.getOperandType()); + } + + @Test + void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() { + RuntimeManifest manifest = new RuntimeManifest(); + manifest.setIdentifier("manifestId"); + RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null); + + C2Heartbeat heartbeat = new C2Heartbeat(); + AgentInfo agentInfo = new AgentInfo(); + DeviceInfo deviceInfo = new DeviceInfo(); + FlowInfo flowInfo = new FlowInfo(); + heartbeat.setAgentInfo(agentInfo); + heartbeat.setDeviceInfo(deviceInfo); + heartbeat.setFlowInfo(flowInfo); + + when(heartbeatFactory.create(runtimeInfoWrapper)).thenReturn(heartbeat); + DescribeManifestOperationHandler handler = new DescribeManifestOperationHandler(heartbeatFactory, () -> runtimeInfoWrapper); + + C2Operation operation = new C2Operation(); + operation.setIdentifier(OPERATION_ID); + + C2OperationAck response = handler.handle(operation); + + assertEquals(OPERATION_ID, response.getOperationId()); + assertEquals(C2OperationState.OperationState.FULLY_APPLIED, response.getOperationState().getState()); + assertEquals(agentInfo, response.getAgentInfo()); + assertEquals(manifest, response.getAgentInfo().getAgentManifest()); + assertEquals(deviceInfo, response.getDeviceInfo()); + assertEquals(flowInfo, response.getFlowInfo()); + } +} diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/AgentInfo.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/AgentInfo.java index ac06e265b8..5660b8c2eb 100644 --- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/AgentInfo.java +++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/AgentInfo.java @@ -29,6 +29,7 @@ public class AgentInfo implements Serializable { private String identifier; private String agentClass; + private String agentManifestHash; private RuntimeManifest agentManifest; private AgentStatus status; @@ -55,6 +56,15 @@ public class AgentInfo implements Serializable { this.agentClass = agentClass; } + @ApiModelProperty("The hash code of the manifest definition generated by the agent.") + public String getAgentManifestHash() { + return this.agentManifestHash; + } + + public void setAgentManifestHash(String agentManifestHash) { + this.agentManifestHash = agentManifestHash; + } + @ApiModelProperty("The specification of the agent's capabilities") public RuntimeManifest getAgentManifest() { return agentManifest; diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf index ac05d01286..1233a05f1d 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf @@ -149,6 +149,8 @@ java.arg.14=-Djava.awt.headless=true #c2.runtime.type=minifi-java # Optional. Defaults to a hardware based unique identifier #c2.agent.identifier= +# If set to false heartbeat won't contain the manifest. Defaults to true. +#c2.full.heartbeat=false ## Define TLS security properties for C2 communications #c2.security.truststore.location= #c2.security.truststore.password= diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java index 7983d9967a..40373436df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java @@ -38,6 +38,7 @@ public class C2NiFiProperties { public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout"; public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class"; public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier"; + public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + "full.heartbeat"; public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions"; public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java index 66dcd9967d..fe227b1f9e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java @@ -30,6 +30,7 @@ import org.apache.nifi.c2.client.service.C2HeartbeatFactory; import org.apache.nifi.c2.client.service.FlowIdHolder; import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper; import org.apache.nifi.c2.client.service.operation.C2OperationService; +import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler; import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler; import org.apache.nifi.c2.protocol.api.AgentRepositories; import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus; @@ -89,10 +90,14 @@ public class C2NifiClientService { this.heartbeatPeriod = clientConfig.getHeartbeatPeriod(); this.flowController = flowController; C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer()); + C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder); this.c2ClientService = new C2ClientService( client, - new C2HeartbeatFactory(clientConfig, flowIdHolder), - new C2OperationService(Arrays.asList(new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent))) + heartbeatFactory, + new C2OperationService(Arrays.asList( + new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent), + new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo) + )) ); } @@ -100,6 +105,7 @@ public class C2NifiClientService { return new C2ClientConfig.Builder() .agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, "")) .agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY)) + .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true"))) .heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY, String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD)))) .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT,