This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 5f0f801 NIFI-6879: Added system tests for variables. Fixed bug that
resulted in Variable Registry not being updated if a Processor in a child group
referenced it
5f0f801 is described below
commit 5f0f801e4600d12e05466bd19abbfc9d745c7994
Author: Mark Payne <[email protected]>
AuthorDate: Fri Dec 20 13:58:39 2019 -0500
NIFI-6879: Added system tests for variables. Fixed bug that resulted in
Variable Registry not being updated if a Processor in a child group referenced
it
Signed-off-by: Pierre Villard <[email protected]>
This closes #3949.
---
.../apache/nifi/groups/StandardProcessGroup.java | 22 ++-
.../apache/nifi/tests/system/NiFiClientUtil.java | 139 +++++++++++++++----
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 23 ++++
.../system/variables/ProcessGroupVariablesIT.java | 149 +++++++++++++++++++++
.../src/test/resources/conf/default/bootstrap.conf | 2 +-
.../cli/impl/client/nifi/ConnectionClient.java | 10 ++
.../client/nifi/impl/JerseyConnectionClient.java | 76 +++++++++++
7 files changed, 392 insertions(+), 29 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 97064cf..fc52ad7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -48,6 +48,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
@@ -3065,9 +3066,24 @@ public final class StandardProcessGroup implements
ProcessGroup {
final boolean overridden =
childRegistry.getVariableMap().containsKey(descriptor);
if (!overridden) {
final Set<ComponentNode> affectedComponents =
childGroup.getComponentsAffectedByVariable(variableName);
- if (!affectedComponents.isEmpty()) {
- throw new IllegalStateException("Cannot update
variable '" + variableName + "' because it is referenced by " +
affectedComponents.size() + " components that are " +
- "currently running.");
+
+ for (final ComponentNode affectedComponent :
affectedComponents) {
+ if (affectedComponent instanceof ProcessorNode) {
+ final ProcessorNode affectedProcessor =
(ProcessorNode) affectedComponent;
+ if (affectedProcessor.isRunning()) {
+ throw new IllegalStateException("Cannot
update variable '" + variableName + "' because it is referenced by " +
affectedComponent + ", which is currently running.");
+ }
+ } else if (affectedComponent instanceof
ControllerServiceNode) {
+ final ControllerServiceNode affectedService =
(ControllerServiceNode) affectedComponent;
+ if (affectedService.isActive()) {
+ throw new IllegalStateException("Cannot
update variable '" + variableName + "' because it is referenced by " +
affectedComponent + ", which is currently active.");
+ }
+ } else if (affectedComponent instanceof
ReportingTaskNode) {
+ final ReportingTaskNode affectedReportingTask
= (ReportingTaskNode) affectedComponent;
+ if (affectedReportingTask.isRunning()) {
+ throw new IllegalStateException("Cannot
update variable '" + variableName + "' because it is referenced by " +
affectedComponent + ", which is currently running.");
+ }
+ }
}
}
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 21f04f3..03e14c6 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO;
@@ -42,6 +42,8 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VariableDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
@@ -53,6 +55,8 @@ import
org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
@@ -64,7 +68,12 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
@@ -77,6 +86,8 @@ import java.util.Set;
import java.util.stream.Collectors;
public class NiFiClientUtil {
+ private static final Logger logger =
LoggerFactory.getLogger(NiFiClientUtil.class);
+
private final NiFiClient nifiClient;
private final String nifiVersion;
@@ -89,12 +100,21 @@ public class NiFiClientUtil {
return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." +
simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID,
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
- public ProcessorEntity createProcessor(final String type, final String
groupId, final String artifactId, final String version) throws
NiFiClientException, IOException {
+ public ProcessorEntity createProcessor(final String simpleTypeName, final
String groupId) throws NiFiClientException, IOException {
+ return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." +
simpleTypeName, groupId, NiFiSystemIT.NIFI_GROUP_ID,
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
+ }
+
+ public ProcessorEntity createProcessor(final String type, final String
bundleGroupId, final String artifactId, final String version) throws
NiFiClientException, IOException {
+ return createProcessor(type, "root", bundleGroupId, artifactId,
version);
+ }
+
+ public ProcessorEntity createProcessor(final String type, final String
processGroupId, final String bundleGroupId, final String artifactId, final
String version)
+ throws NiFiClientException, IOException {
final ProcessorDTO dto = new ProcessorDTO();
dto.setType(type);
final BundleDTO bundle = new BundleDTO();
- bundle.setGroup(groupId);
+ bundle.setGroup(bundleGroupId);
bundle.setArtifact(artifactId);
bundle.setVersion(version);
dto.setBundle(bundle);
@@ -103,7 +123,7 @@ public class NiFiClientUtil {
entity.setComponent(dto);
entity.setRevision(createNewRevision());
- return nifiClient.getProcessorClient().createProcessor("root", entity);
+ return nifiClient.getProcessorClient().createProcessor(processGroupId,
entity);
}
public ControllerServiceEntity createControllerService(final String
simpleTypeName) throws NiFiClientException, IOException {
@@ -153,7 +173,7 @@ public class NiFiClientUtil {
return entity;
}
- private RevisionDTO createNewRevision() {
+ public RevisionDTO createNewRevision() {
final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(getClass().getName());
revisionDto.setVersion(0L);
@@ -277,9 +297,8 @@ public class NiFiClientUtil {
return;
}
- if ("Validating".equals(validationStatus)) {
- Thread.sleep(100L);
- continue;
+ if ("Invalid".equalsIgnoreCase(validationStatus)) {
+ logger.info("Processor with ID {} is currently invalid due to:
{}", processorId, entity.getComponent().getValidationErrors());
}
Thread.sleep(100L);
@@ -378,23 +397,7 @@ public class NiFiClientUtil {
}
for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) {
- waitForProcessorsStopped(group.getComponent());
- }
- }
-
- private void waitForProcessorsStopped(final ProcessGroupDTO group) throws
IOException, NiFiClientException {
- final FlowSnippetDTO groupContents = group.getContents();
- for (final ProcessorDTO processor : groupContents.getProcessors()) {
- try {
- waitForStoppedProcessor(processor.getId());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new NiFiClientException("Interrupted while waiting for
Processor with ID " + processor.getId() + " to stop");
- }
- }
-
- for (final ProcessGroupDTO child : groupContents.getProcessGroups()) {
- waitForProcessorsStopped(child);
+ waitForProcessorsStopped(group.getId());
}
}
@@ -659,4 +662,90 @@ public class NiFiClientUtil {
return nifiClient.getControllerClient().disconnectNode(nodeId,
nodeEntity);
}
+
+ public ListingRequestEntity performQueueListing(final String connectionId)
throws NiFiClientException, IOException {
+ try {
+ ListingRequestEntity listingRequestEntity =
nifiClient.getConnectionClient().listQueue(connectionId);
+ while (listingRequestEntity.getListingRequest().getFinished() !=
Boolean.TRUE) {
+ Thread.sleep(10L);
+
+ listingRequestEntity =
nifiClient.getConnectionClient().getListingRequest(connectionId,
listingRequestEntity.getListingRequest().getId());
+ }
+
+ // Delete the listing. Return the previously obtained listing, not
the one from the deletion, because the listing request that is returned from
deleting the listing does not contain the
+ // FlowFile Summaries.
+
nifiClient.getConnectionClient().deleteListingRequest(connectionId,
listingRequestEntity.getListingRequest().getId());
+ return listingRequestEntity;
+ } catch (final InterruptedException e) {
+ Assert.fail("Failed to obtain connection status");
+ return null;
+ }
+ }
+
+ public FlowFileEntity getQueueFlowFile(final String connectionId, final
int flowFileIndex) throws NiFiClientException, IOException {
+ final ListingRequestEntity listing = performQueueListing(connectionId);
+ final List<FlowFileSummaryDTO> flowFileSummaries =
listing.getListingRequest().getFlowFileSummaries();
+ if (flowFileIndex >= flowFileSummaries.size()) {
+ throw new IllegalArgumentException("Cannot retrieve FlowFile with
index " + flowFileIndex + " because queue only has " + flowFileSummaries.size()
+ " FlowFiles");
+ }
+
+ final FlowFileSummaryDTO flowFileSummary =
flowFileSummaries.get(flowFileIndex);
+ final String uuid = flowFileSummary.getUuid();
+
+ return nifiClient.getConnectionClient().getFlowFile(connectionId,
uuid);
+ }
+
+ public VariableRegistryUpdateRequestEntity updateVariableRegistry(final
ProcessGroupEntity processGroup, final Map<String, String> variables) throws
NiFiClientException, IOException {
+ final Set<VariableEntity> variableEntities = new HashSet<>();
+ for (final Map.Entry<String, String> entry : variables.entrySet()) {
+ final VariableEntity entity = new VariableEntity();
+ variableEntities.add(entity);
+
+ final VariableDTO dto = new VariableDTO();
+ dto.setName(entry.getKey());
+ dto.setValue(entry.getValue());
+ dto.setProcessGroupId(processGroup.getId());
+ entity.setVariable(dto);
+ }
+
+ final VariableRegistryDTO variableRegistryDto = new
VariableRegistryDTO();
+ variableRegistryDto.setProcessGroupId(processGroup.getId());
+ variableRegistryDto.setVariables(variableEntities);
+
+ final VariableRegistryEntity registryEntity = new
VariableRegistryEntity();
+ registryEntity.setProcessGroupRevision(processGroup.getRevision());
+ registryEntity.setVariableRegistry(variableRegistryDto);
+
+ VariableRegistryUpdateRequestEntity updateRequestEntity =
nifiClient.getProcessGroupClient().updateVariableRegistry(processGroup.getId(),
registryEntity);
+ while (!updateRequestEntity.getRequest().isComplete()) {
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {
+ Assert.fail("Interrupted while waiting for variable registry
to update");
+ return null;
+ }
+
+ updateRequestEntity =
nifiClient.getProcessGroupClient().getVariableRegistryUpdateRequest(processGroup.getId(),
updateRequestEntity.getRequest().getRequestId());
+ }
+
+ if (updateRequestEntity.getRequest().getFailureReason() != null) {
+ Assert.fail("Failed to update Variable Registry due to: " +
updateRequestEntity.getRequest().getFailureReason());
+ }
+
+
nifiClient.getProcessGroupClient().deleteVariableRegistryUpdateRequest(processGroup.getId(),
updateRequestEntity.getRequest().getRequestId());
+ return updateRequestEntity;
+ }
+
+ public ProcessGroupEntity createProcessGroup(final String name, final
String parentGroupId) throws NiFiClientException, IOException {
+ final ProcessGroupDTO component = new ProcessGroupDTO();
+ component.setName(name);
+ component.setParentGroupId(parentGroupId);
+
+ final ProcessGroupEntity childGroupEntity = new ProcessGroupEntity();
+ childGroupEntity.setRevision(createNewRevision());
+ childGroupEntity.setComponent(component);
+
+ final ProcessGroupEntity childGroup =
nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId,
childGroupEntity);
+ return childGroup;
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 82a739c..23e04ba 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -22,8 +22,10 @@ import
org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
@@ -235,4 +237,25 @@ public abstract class NiFiSystemIT {
Thread.sleep(10L);
}
}
+
+ protected void waitForQueueCount(final String connectionId, final int
queueSize) throws InterruptedException {
+ waitFor(() -> {
+ final ConnectionStatusEntity statusEntity =
getConnectionStatus(connectionId);
+ return
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
== queueSize;
+ });
+ }
+
+ private ConnectionStatusEntity getConnectionStatus(final String
connectionId) {
+ try {
+ return
getNifiClient().getFlowClient().getConnectionStatus(connectionId, true);
+ } catch (final Exception e) {
+ Assert.fail("Failed to obtain connection status");
+ return null;
+ }
+ }
+
+ protected int getConnectionQueueSize(final String connectionId) {
+ final ConnectionStatusEntity statusEntity =
getConnectionStatus(connectionId);
+ return
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
new file mode 100644
index 0000000..908a8a9
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.variables;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProcessGroupVariablesIT extends NiFiSystemIT {
+
+ @Test
+ public void testChangeVariableWhileProcessorRunning() throws
NiFiClientException, IOException, InterruptedException {
+ // Add variable abc=123 to the root group
+ ProcessGroupEntity rootGroup =
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+ getClientUtil().updateVariableRegistry(rootGroup,
Collections.singletonMap("abc", "123"));
+
+ // Create GenerateFlowFile that uses Variable to create FlowFile
attribute. Connect to CountEvents processor.
+ final ProcessorEntity generateFlowFile =
getClientUtil().createProcessor("GenerateFlowFile");
+ getClientUtil().updateProcessorProperties(generateFlowFile,
Collections.singletonMap("Value", "Hello ${abc}"));
+ getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10
mins");
+
+ final ProcessorEntity countEvents =
getClientUtil().createProcessor("CountEvents");
+ final ConnectionEntity connection =
getClientUtil().createConnection(generateFlowFile, countEvents, "success");
+ getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
+
+ // Wait for processor to be valid
+ getClientUtil().waitForValidProcessor(generateFlowFile.getId());
+
+ // Start Processor, wait for 1 FlowFile to be queued up, then stop
processor
+ getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+ waitForQueueCount(connection.getId(), 1);
+
+ final FlowFileEntity flowFile =
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+ assertEquals("Hello 123",
flowFile.getFlowFile().getAttributes().get("Value"));
+
+ getClientUtil().emptyQueue(connection.getId());
+ waitForQueueCount(connection.getId(), 0);
+
+ rootGroup =
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+ getClientUtil().updateVariableRegistry(rootGroup,
Collections.singletonMap("abc", "xyz"));
+ waitForQueueCount(connection.getId(), 1);
+
+ final FlowFileEntity secondFlowFile =
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+ assertEquals("Hello xyz",
secondFlowFile.getFlowFile().getAttributes().get("Value"));
+ }
+
+
+ @Test
+ public void testChangeHigherLevelVariableWhileProcessorRunning() throws
NiFiClientException, IOException, InterruptedException {
+ // Add variable abc=123 to the root group
+ ProcessGroupEntity rootGroup =
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+ getClientUtil().updateVariableRegistry(rootGroup,
Collections.singletonMap("abc", "123"));
+
+ final ProcessGroupEntity childGroup =
getClientUtil().createProcessGroup("child", "root");
+
+ // Create GenerateFlowFile that uses Variable to create FlowFile
attribute. Connect to CountEvents processor.
+ final ProcessorEntity generateFlowFile =
getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId());
+ getClientUtil().updateProcessorProperties(generateFlowFile,
Collections.singletonMap("Value", "Hello ${abc}"));
+ getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10
mins");
+
+ final ProcessorEntity countEvents =
getClientUtil().createProcessor("CountEvents", childGroup.getId());
+ final ConnectionEntity connection =
getClientUtil().createConnection(generateFlowFile, countEvents, "success");
+ getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
+
+ // Wait for processor to be valid
+ getClientUtil().waitForValidProcessor(generateFlowFile.getId());
+
+ // Start Processor, wait for 1 FlowFile to be queued up, then stop
processor
+ getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+ waitForQueueCount(connection.getId(), 1);
+
+ final FlowFileEntity flowFile =
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+ assertEquals("Hello 123",
flowFile.getFlowFile().getAttributes().get("Value"));
+
+ getClientUtil().emptyQueue(connection.getId());
+ waitForQueueCount(connection.getId(), 0);
+
+ rootGroup =
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+ getClientUtil().updateVariableRegistry(rootGroup,
Collections.singletonMap("abc", "xyz"));
+ waitForQueueCount(connection.getId(), 1);
+
+ final FlowFileEntity secondFlowFile =
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+ assertEquals("Hello xyz",
secondFlowFile.getFlowFile().getAttributes().get("Value"));
+ }
+
+
+ @Test
+ public void testChangeVariableThatIsOverridden() throws
NiFiClientException, IOException, InterruptedException {
+ // Add variable abc=123 to the root group
+ ProcessGroupEntity rootGroup =
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+ getClientUtil().updateVariableRegistry(rootGroup,
Collections.singletonMap("abc", "123"));
+
+ final ProcessGroupEntity childGroup =
getClientUtil().createProcessGroup("child", "root");
+ getClientUtil().updateVariableRegistry(childGroup,
Collections.singletonMap("abc", "123"));
+
+ // Create GenerateFlowFile that uses Variable to create FlowFile
attribute. Connect to CountEvents processor.
+ final ProcessorEntity generateFlowFile =
getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId());
+ getClientUtil().updateProcessorProperties(generateFlowFile,
Collections.singletonMap("Value", "Hello ${abc}"));
+ getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10
mins");
+
+ final ProcessorEntity countEvents =
getClientUtil().createProcessor("CountEvents", childGroup.getId());
+ final ConnectionEntity connection =
getClientUtil().createConnection(generateFlowFile, countEvents, "success");
+ getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
+
+ // Wait for processor to be valid
+ getClientUtil().waitForValidProcessor(generateFlowFile.getId());
+
+ // Start Processor, wait for 1 FlowFile to be queued up, then stop
processor
+ getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+ waitForQueueCount(connection.getId(), 1);
+
+ final FlowFileEntity flowFile =
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+ assertEquals("Hello 123",
flowFile.getFlowFile().getAttributes().get("Value"));
+
+ getClientUtil().emptyQueue(connection.getId());
+ waitForQueueCount(connection.getId(), 0);
+
+ rootGroup =
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+ getClientUtil().updateVariableRegistry(rootGroup,
Collections.singletonMap("abc", "xyz"));
+
+ // Wait a bit and ensure that the queue is still empty.
+ Thread.sleep(2000L);
+
+ assertEquals(0, getConnectionQueueSize(connection.getId()));
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
index 3dc2c29..a68d11a 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
@@ -28,6 +28,6 @@ java.arg.3=-Xmx128m
java.arg.14=-Djava.awt.headless=true
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
+java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
nifi.bootstrap.sensitive.key=
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
index af5b5e9..a486a65 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
@@ -18,6 +18,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
import java.io.IOException;
@@ -37,4 +39,12 @@ public interface ConnectionClient {
DropRequestEntity getDropRequest(String connectionId, String
dropRequestId) throws NiFiClientException, IOException;
DropRequestEntity deleteDropRequest(String connectionId, String
dropRequestId) throws NiFiClientException, IOException;
+
+ ListingRequestEntity listQueue(String connectionId) throws
NiFiClientException, IOException;
+
+ ListingRequestEntity getListingRequest(String connectionId, String
listingRequestId) throws NiFiClientException, IOException;
+
+ ListingRequestEntity deleteListingRequest(String connectionId, String
listingRequestId) throws NiFiClientException, IOException;
+
+ FlowFileEntity getFlowFile(String connectionId, String flowFileUuid)
throws NiFiClientException, IOException;
}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
index 51ded4e..f4180fe 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
@@ -21,6 +21,8 @@ import
org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@@ -184,4 +186,78 @@ public class JerseyConnectionClient extends
AbstractJerseyClient implements Conn
});
}
+ @Override
+ public ListingRequestEntity listQueue(final String connectionId) throws
NiFiClientException, IOException {
+ if (connectionId == null) {
+ throw new IllegalArgumentException("Connection ID cannot be null");
+ }
+
+ return executeAction("Error listing queue for Connection", () -> {
+ final WebTarget target = flowFileQueueTarget
+ .path("listing-requests")
+ .resolveTemplate("id", connectionId);
+
+ return getRequestBuilder(target).post(
+ Entity.entity(connectionId, MediaType.TEXT_PLAIN),
+ ListingRequestEntity.class
+ );
+ });
+ }
+
+ @Override
+ public ListingRequestEntity getListingRequest(final String connectionId,
final String listingRequestId) throws NiFiClientException, IOException {
+ if (connectionId == null) {
+ throw new IllegalArgumentException("Connection ID cannot be null");
+ }
+ if (listingRequestId == null) {
+ throw new IllegalArgumentException("Listing Request ID cannot be
null");
+ }
+
+ return executeAction("Error retrieving Listing Request", () -> {
+ final WebTarget target = flowFileQueueTarget
+ .path("listing-requests/{requestId}")
+ .resolveTemplate("id", connectionId)
+ .resolveTemplate("requestId", listingRequestId);
+
+ return getRequestBuilder(target).get(ListingRequestEntity.class);
+ });
+ }
+
+ @Override
+ public ListingRequestEntity deleteListingRequest(final String
connectionId, final String listingRequestId) throws NiFiClientException,
IOException {
+ if (connectionId == null) {
+ throw new IllegalArgumentException("Connection ID cannot be null");
+ }
+ if (listingRequestId == null) {
+ throw new IllegalArgumentException("Listing Request ID cannot be
null");
+ }
+
+ return executeAction("Error retrieving Listing Request", () -> {
+ final WebTarget target = flowFileQueueTarget
+ .path("listing-requests/{requestId}")
+ .resolveTemplate("id", connectionId)
+ .resolveTemplate("requestId", listingRequestId);
+
+ return
getRequestBuilder(target).delete(ListingRequestEntity.class);
+ });
+ }
+
+ @Override
+ public FlowFileEntity getFlowFile(final String connectionId, final String
flowFileUuid) throws NiFiClientException, IOException {
+ if (connectionId == null) {
+ throw new IllegalArgumentException("Connection ID cannot be null");
+ }
+ if (flowFileUuid == null) {
+ throw new IllegalArgumentException("FlowFile UUID cannot be null");
+ }
+
+ return executeAction("Error retrieving FlowFile", () -> {
+ final WebTarget target = flowFileQueueTarget
+ .path("flowfiles/{uuid}")
+ .resolveTemplate("id", connectionId)
+ .resolveTemplate("uuid", flowFileUuid);
+
+ return getRequestBuilder(target).get(FlowFileEntity.class);
+ });
+ }
}