This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f0f801  NIFI-6879: Added system tests for variables. Fixed bug that 
resulted in Variable Registry not being updated if a Processor in a child group 
referenced it
5f0f801 is described below

commit 5f0f801e4600d12e05466bd19abbfc9d745c7994
Author: Mark Payne <[email protected]>
AuthorDate: Fri Dec 20 13:58:39 2019 -0500

    NIFI-6879: Added system tests for variables. Fixed bug that resulted in 
Variable Registry not being updated if a Processor in a child group referenced 
it
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3949.
---
 .../apache/nifi/groups/StandardProcessGroup.java   |  22 ++-
 .../apache/nifi/tests/system/NiFiClientUtil.java   | 139 +++++++++++++++----
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  23 ++++
 .../system/variables/ProcessGroupVariablesIT.java  | 149 +++++++++++++++++++++
 .../src/test/resources/conf/default/bootstrap.conf |   2 +-
 .../cli/impl/client/nifi/ConnectionClient.java     |  10 ++
 .../client/nifi/impl/JerseyConnectionClient.java   |  76 +++++++++++
 7 files changed, 392 insertions(+), 29 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 97064cf..fc52ad7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -48,6 +48,7 @@ import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
@@ -3065,9 +3066,24 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                     final boolean overridden = 
childRegistry.getVariableMap().containsKey(descriptor);
                     if (!overridden) {
                         final Set<ComponentNode> affectedComponents = 
childGroup.getComponentsAffectedByVariable(variableName);
-                        if (!affectedComponents.isEmpty()) {
-                            throw new IllegalStateException("Cannot update 
variable '" + variableName + "' because it is referenced by " + 
affectedComponents.size() + " components that are " +
-                                "currently running.");
+
+                        for (final ComponentNode affectedComponent : 
affectedComponents) {
+                            if (affectedComponent instanceof ProcessorNode) {
+                                final ProcessorNode affectedProcessor = 
(ProcessorNode) affectedComponent;
+                                if (affectedProcessor.isRunning()) {
+                                    throw new IllegalStateException("Cannot 
update variable '" + variableName + "' because it is referenced by " + 
affectedComponent + ", which is currently running.");
+                                }
+                            } else if (affectedComponent instanceof 
ControllerServiceNode) {
+                                final ControllerServiceNode affectedService = 
(ControllerServiceNode) affectedComponent;
+                                if (affectedService.isActive()) {
+                                    throw new IllegalStateException("Cannot 
update variable '" + variableName + "' because it is referenced by " + 
affectedComponent + ", which is currently active.");
+                                }
+                            } else if (affectedComponent instanceof 
ReportingTaskNode) {
+                                final ReportingTaskNode affectedReportingTask 
= (ReportingTaskNode) affectedComponent;
+                                if (affectedReportingTask.isRunning()) {
+                                    throw new IllegalStateException("Cannot 
update variable '" + variableName + "' because it is referenced by " + 
affectedComponent + ", which is currently running.");
+                                }
+                            }
                         }
                     }
                 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 21f04f3..03e14c6 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.CounterDTO;
 import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
 import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.dto.ParameterContextDTO;
 import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO;
@@ -42,6 +42,8 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VariableDTO;
+import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
@@ -53,6 +55,8 @@ import 
org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CountersEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
 import org.apache.nifi.web.api.entity.NodeEntity;
 import org.apache.nifi.web.api.entity.ParameterContextEntity;
 import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
@@ -64,7 +68,12 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
+import org.apache.nifi.web.api.entity.VariableEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryEntity;
+import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -77,6 +86,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 public class NiFiClientUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(NiFiClientUtil.class);
+
     private final NiFiClient nifiClient;
     private final String nifiVersion;
 
@@ -89,12 +100,21 @@ public class NiFiClientUtil {
         return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + 
simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, 
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
     }
 
-    public ProcessorEntity createProcessor(final String type, final String 
groupId, final String artifactId, final String version) throws 
NiFiClientException, IOException {
+    public ProcessorEntity createProcessor(final String simpleTypeName, final 
String groupId) throws NiFiClientException, IOException {
+        return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + 
simpleTypeName, groupId, NiFiSystemIT.NIFI_GROUP_ID, 
NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
+    }
+
+    public ProcessorEntity createProcessor(final String type, final String 
bundleGroupId, final String artifactId, final String version) throws 
NiFiClientException, IOException {
+        return createProcessor(type, "root", bundleGroupId, artifactId, 
version);
+    }
+
+    public ProcessorEntity createProcessor(final String type, final String 
processGroupId, final String bundleGroupId, final String artifactId, final 
String version)
+            throws NiFiClientException, IOException {
         final ProcessorDTO dto = new ProcessorDTO();
         dto.setType(type);
 
         final BundleDTO bundle = new BundleDTO();
-        bundle.setGroup(groupId);
+        bundle.setGroup(bundleGroupId);
         bundle.setArtifact(artifactId);
         bundle.setVersion(version);
         dto.setBundle(bundle);
@@ -103,7 +123,7 @@ public class NiFiClientUtil {
         entity.setComponent(dto);
         entity.setRevision(createNewRevision());
 
-        return nifiClient.getProcessorClient().createProcessor("root", entity);
+        return nifiClient.getProcessorClient().createProcessor(processGroupId, 
entity);
     }
 
     public ControllerServiceEntity createControllerService(final String 
simpleTypeName) throws NiFiClientException, IOException {
@@ -153,7 +173,7 @@ public class NiFiClientUtil {
         return entity;
     }
 
-    private RevisionDTO createNewRevision() {
+    public RevisionDTO createNewRevision() {
         final RevisionDTO revisionDto = new RevisionDTO();
         revisionDto.setClientId(getClass().getName());
         revisionDto.setVersion(0L);
@@ -277,9 +297,8 @@ public class NiFiClientUtil {
                 return;
             }
 
-            if ("Validating".equals(validationStatus)) {
-                Thread.sleep(100L);
-                continue;
+            if ("Invalid".equalsIgnoreCase(validationStatus)) {
+                logger.info("Processor with ID {} is currently invalid due to: 
{}", processorId, entity.getComponent().getValidationErrors());
             }
 
             Thread.sleep(100L);
@@ -378,23 +397,7 @@ public class NiFiClientUtil {
         }
 
         for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) {
-            waitForProcessorsStopped(group.getComponent());
-        }
-    }
-
-    private void waitForProcessorsStopped(final ProcessGroupDTO group) throws 
IOException, NiFiClientException {
-        final FlowSnippetDTO groupContents = group.getContents();
-        for (final ProcessorDTO processor : groupContents.getProcessors()) {
-            try {
-                waitForStoppedProcessor(processor.getId());
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new NiFiClientException("Interrupted while waiting for 
Processor with ID " + processor.getId() + " to stop");
-            }
-        }
-
-        for (final ProcessGroupDTO child : groupContents.getProcessGroups()) {
-            waitForProcessorsStopped(child);
+            waitForProcessorsStopped(group.getId());
         }
     }
 
@@ -659,4 +662,90 @@ public class NiFiClientUtil {
 
         return nifiClient.getControllerClient().disconnectNode(nodeId, 
nodeEntity);
     }
+
+    public ListingRequestEntity performQueueListing(final String connectionId) 
throws NiFiClientException, IOException {
+        try {
+            ListingRequestEntity listingRequestEntity = 
nifiClient.getConnectionClient().listQueue(connectionId);
+            while (listingRequestEntity.getListingRequest().getFinished() != 
Boolean.TRUE) {
+                Thread.sleep(10L);
+
+                listingRequestEntity = 
nifiClient.getConnectionClient().getListingRequest(connectionId, 
listingRequestEntity.getListingRequest().getId());
+            }
+
+            // Delete the listing. Return the previously obtained listing, not 
the one from the deletion, because the listing request that is returned from 
deleting the listing does not contain the
+            // FlowFile Summaries.
+            
nifiClient.getConnectionClient().deleteListingRequest(connectionId, 
listingRequestEntity.getListingRequest().getId());
+            return listingRequestEntity;
+        } catch (final InterruptedException e) {
+            Assert.fail("Failed to obtain connection status");
+            return null;
+        }
+    }
+
+    public FlowFileEntity getQueueFlowFile(final String connectionId, final 
int flowFileIndex) throws NiFiClientException, IOException {
+        final ListingRequestEntity listing = performQueueListing(connectionId);
+        final List<FlowFileSummaryDTO> flowFileSummaries = 
listing.getListingRequest().getFlowFileSummaries();
+        if (flowFileIndex >= flowFileSummaries.size()) {
+            throw new IllegalArgumentException("Cannot retrieve FlowFile with 
index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() 
+ " FlowFiles");
+        }
+
+        final FlowFileSummaryDTO flowFileSummary = 
flowFileSummaries.get(flowFileIndex);
+        final String uuid = flowFileSummary.getUuid();
+
+        return nifiClient.getConnectionClient().getFlowFile(connectionId, 
uuid);
+    }
+
+    public VariableRegistryUpdateRequestEntity updateVariableRegistry(final 
ProcessGroupEntity processGroup, final Map<String, String> variables) throws 
NiFiClientException, IOException {
+        final Set<VariableEntity> variableEntities = new HashSet<>();
+        for (final Map.Entry<String, String> entry : variables.entrySet()) {
+            final VariableEntity entity = new VariableEntity();
+            variableEntities.add(entity);
+
+            final VariableDTO dto = new VariableDTO();
+            dto.setName(entry.getKey());
+            dto.setValue(entry.getValue());
+            dto.setProcessGroupId(processGroup.getId());
+            entity.setVariable(dto);
+        }
+
+        final VariableRegistryDTO variableRegistryDto = new 
VariableRegistryDTO();
+        variableRegistryDto.setProcessGroupId(processGroup.getId());
+        variableRegistryDto.setVariables(variableEntities);
+
+        final VariableRegistryEntity registryEntity = new 
VariableRegistryEntity();
+        registryEntity.setProcessGroupRevision(processGroup.getRevision());
+        registryEntity.setVariableRegistry(variableRegistryDto);
+
+        VariableRegistryUpdateRequestEntity updateRequestEntity = 
nifiClient.getProcessGroupClient().updateVariableRegistry(processGroup.getId(), 
registryEntity);
+        while (!updateRequestEntity.getRequest().isComplete()) {
+            try {
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Assert.fail("Interrupted while waiting for variable registry 
to update");
+                return null;
+            }
+
+            updateRequestEntity = 
nifiClient.getProcessGroupClient().getVariableRegistryUpdateRequest(processGroup.getId(),
 updateRequestEntity.getRequest().getRequestId());
+        }
+
+        if (updateRequestEntity.getRequest().getFailureReason() != null) {
+            Assert.fail("Failed to update Variable Registry due to: " + 
updateRequestEntity.getRequest().getFailureReason());
+        }
+
+        
nifiClient.getProcessGroupClient().deleteVariableRegistryUpdateRequest(processGroup.getId(),
 updateRequestEntity.getRequest().getRequestId());
+        return updateRequestEntity;
+    }
+
+    public ProcessGroupEntity createProcessGroup(final String name, final 
String parentGroupId) throws NiFiClientException, IOException {
+        final ProcessGroupDTO component = new ProcessGroupDTO();
+        component.setName(name);
+        component.setParentGroupId(parentGroupId);
+
+        final ProcessGroupEntity childGroupEntity = new ProcessGroupEntity();
+        childGroupEntity.setRevision(createNewRevision());
+        childGroupEntity.setComponent(component);
+
+        final ProcessGroupEntity childGroup = 
nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, 
childGroupEntity);
+        return childGroup;
+    }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 82a739c..23e04ba 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -22,8 +22,10 @@ import 
org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
 import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TestName;
@@ -235,4 +237,25 @@ public abstract class NiFiSystemIT {
             Thread.sleep(10L);
         }
     }
+
+    protected void waitForQueueCount(final String connectionId, final int 
queueSize) throws InterruptedException {
+        waitFor(() -> {
+            final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
+            return 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() 
== queueSize;
+        });
+    }
+
+    private ConnectionStatusEntity getConnectionStatus(final String 
connectionId) {
+        try {
+            return 
getNifiClient().getFlowClient().getConnectionStatus(connectionId, true);
+        } catch (final Exception e) {
+            Assert.fail("Failed to obtain connection status");
+            return null;
+        }
+    }
+
+    protected int getConnectionQueueSize(final String connectionId) {
+        final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
+        return 
statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
+    }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
new file mode 100644
index 0000000..908a8a9
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.variables;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProcessGroupVariablesIT extends NiFiSystemIT {
+
+    @Test
+    public void testChangeVariableWhileProcessorRunning() throws 
NiFiClientException, IOException, InterruptedException {
+        // Add variable abc=123 to the root group
+        ProcessGroupEntity rootGroup = 
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+        getClientUtil().updateVariableRegistry(rootGroup, 
Collections.singletonMap("abc", "123"));
+
+        // Create GenerateFlowFile that uses Variable to create FlowFile 
attribute. Connect to CountEvents processor.
+        final ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile");
+        getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Value", "Hello ${abc}"));
+        getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 
mins");
+
+        final ProcessorEntity countEvents = 
getClientUtil().createProcessor("CountEvents");
+        final ConnectionEntity connection = 
getClientUtil().createConnection(generateFlowFile, countEvents, "success");
+        getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
+
+        // Wait for processor to be valid
+        getClientUtil().waitForValidProcessor(generateFlowFile.getId());
+
+        // Start Processor, wait for 1 FlowFile to be queued up, then stop 
processor
+        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        waitForQueueCount(connection.getId(), 1);
+
+        final FlowFileEntity flowFile = 
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+        assertEquals("Hello 123", 
flowFile.getFlowFile().getAttributes().get("Value"));
+
+        getClientUtil().emptyQueue(connection.getId());
+        waitForQueueCount(connection.getId(), 0);
+
+        rootGroup = 
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+        getClientUtil().updateVariableRegistry(rootGroup, 
Collections.singletonMap("abc", "xyz"));
+        waitForQueueCount(connection.getId(), 1);
+
+        final FlowFileEntity secondFlowFile = 
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+        assertEquals("Hello xyz", 
secondFlowFile.getFlowFile().getAttributes().get("Value"));
+    }
+
+
+    @Test
+    public void testChangeHigherLevelVariableWhileProcessorRunning() throws 
NiFiClientException, IOException, InterruptedException {
+        // Add variable abc=123 to the root group
+        ProcessGroupEntity rootGroup = 
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+        getClientUtil().updateVariableRegistry(rootGroup, 
Collections.singletonMap("abc", "123"));
+
+        final ProcessGroupEntity childGroup = 
getClientUtil().createProcessGroup("child", "root");
+
+        // Create GenerateFlowFile that uses Variable to create FlowFile 
attribute. Connect to CountEvents processor.
+        final ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId());
+        getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Value", "Hello ${abc}"));
+        getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 
mins");
+
+        final ProcessorEntity countEvents = 
getClientUtil().createProcessor("CountEvents", childGroup.getId());
+        final ConnectionEntity connection = 
getClientUtil().createConnection(generateFlowFile, countEvents, "success");
+        getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
+
+        // Wait for processor to be valid
+        getClientUtil().waitForValidProcessor(generateFlowFile.getId());
+
+        // Start Processor, wait for 1 FlowFile to be queued up, then stop 
processor
+        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        waitForQueueCount(connection.getId(), 1);
+
+        final FlowFileEntity flowFile = 
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+        assertEquals("Hello 123", 
flowFile.getFlowFile().getAttributes().get("Value"));
+
+        getClientUtil().emptyQueue(connection.getId());
+        waitForQueueCount(connection.getId(), 0);
+
+        rootGroup = 
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+        getClientUtil().updateVariableRegistry(rootGroup, 
Collections.singletonMap("abc", "xyz"));
+        waitForQueueCount(connection.getId(), 1);
+
+        final FlowFileEntity secondFlowFile = 
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+        assertEquals("Hello xyz", 
secondFlowFile.getFlowFile().getAttributes().get("Value"));
+    }
+
+
+    @Test
+    public void testChangeVariableThatIsOverridden() throws 
NiFiClientException, IOException, InterruptedException {
+        // Add variable abc=123 to the root group
+        ProcessGroupEntity rootGroup = 
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+        getClientUtil().updateVariableRegistry(rootGroup, 
Collections.singletonMap("abc", "123"));
+
+        final ProcessGroupEntity childGroup = 
getClientUtil().createProcessGroup("child", "root");
+        getClientUtil().updateVariableRegistry(childGroup, 
Collections.singletonMap("abc", "123"));
+
+        // Create GenerateFlowFile that uses Variable to create FlowFile 
attribute. Connect to CountEvents processor.
+        final ProcessorEntity generateFlowFile = 
getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId());
+        getClientUtil().updateProcessorProperties(generateFlowFile, 
Collections.singletonMap("Value", "Hello ${abc}"));
+        getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 
mins");
+
+        final ProcessorEntity countEvents = 
getClientUtil().createProcessor("CountEvents", childGroup.getId());
+        final ConnectionEntity connection = 
getClientUtil().createConnection(generateFlowFile, countEvents, "success");
+        getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
+
+        // Wait for processor to be valid
+        getClientUtil().waitForValidProcessor(generateFlowFile.getId());
+
+        // Start Processor, wait for 1 FlowFile to be queued up, then stop 
processor
+        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        waitForQueueCount(connection.getId(), 1);
+
+        final FlowFileEntity flowFile = 
getClientUtil().getQueueFlowFile(connection.getId(), 0);
+        assertEquals("Hello 123", 
flowFile.getFlowFile().getAttributes().get("Value"));
+
+        getClientUtil().emptyQueue(connection.getId());
+        waitForQueueCount(connection.getId(), 0);
+
+        rootGroup = 
getNifiClient().getProcessGroupClient().getProcessGroup("root");
+        getClientUtil().updateVariableRegistry(rootGroup, 
Collections.singletonMap("abc", "xyz"));
+
+        // Wait a bit and ensure that the queue is still empty.
+        Thread.sleep(2000L);
+
+        assertEquals(0, getConnectionQueueSize(connection.getId()));
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
index 3dc2c29..a68d11a 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf
@@ -28,6 +28,6 @@ java.arg.3=-Xmx128m
 
 java.arg.14=-Djava.awt.headless=true
 
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
+java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
 
 nifi.bootstrap.sensitive.key=
\ No newline at end of file
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
index af5b5e9..a486a65 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java
@@ -18,6 +18,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
 
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
 
 import java.io.IOException;
 
@@ -37,4 +39,12 @@ public interface ConnectionClient {
     DropRequestEntity getDropRequest(String connectionId, String 
dropRequestId) throws NiFiClientException, IOException;
 
     DropRequestEntity deleteDropRequest(String connectionId, String 
dropRequestId) throws NiFiClientException, IOException;
+
+    ListingRequestEntity listQueue(String connectionId) throws 
NiFiClientException, IOException;
+
+    ListingRequestEntity getListingRequest(String connectionId, String 
listingRequestId) throws NiFiClientException, IOException;
+
+    ListingRequestEntity deleteListingRequest(String connectionId, String 
listingRequestId) throws NiFiClientException, IOException;
+
+    FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) 
throws NiFiClientException, IOException;
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
index 51ded4e..f4180fe 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java
@@ -21,6 +21,8 @@ import 
org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.DropRequestEntity;
+import org.apache.nifi.web.api.entity.FlowFileEntity;
+import org.apache.nifi.web.api.entity.ListingRequestEntity;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
@@ -184,4 +186,78 @@ public class JerseyConnectionClient extends 
AbstractJerseyClient implements Conn
         });
     }
 
+    @Override
+    public ListingRequestEntity listQueue(final String connectionId) throws 
NiFiClientException, IOException {
+        if (connectionId == null) {
+            throw new IllegalArgumentException("Connection ID cannot be null");
+        }
+
+        return executeAction("Error listing queue for Connection", () -> {
+            final WebTarget target = flowFileQueueTarget
+                .path("listing-requests")
+                .resolveTemplate("id", connectionId);
+
+            return getRequestBuilder(target).post(
+                Entity.entity(connectionId, MediaType.TEXT_PLAIN),
+                ListingRequestEntity.class
+            );
+        });
+    }
+
+    @Override
+    public ListingRequestEntity getListingRequest(final String connectionId, 
final String listingRequestId) throws NiFiClientException, IOException {
+        if (connectionId == null) {
+            throw new IllegalArgumentException("Connection ID cannot be null");
+        }
+        if (listingRequestId == null) {
+            throw new IllegalArgumentException("Listing Request ID cannot be 
null");
+        }
+
+        return executeAction("Error retrieving Listing Request", () -> {
+            final WebTarget target = flowFileQueueTarget
+                .path("listing-requests/{requestId}")
+                .resolveTemplate("id", connectionId)
+                .resolveTemplate("requestId", listingRequestId);
+
+            return getRequestBuilder(target).get(ListingRequestEntity.class);
+        });
+    }
+
+    @Override
+    public ListingRequestEntity deleteListingRequest(final String 
connectionId, final String listingRequestId) throws NiFiClientException, 
IOException {
+        if (connectionId == null) {
+            throw new IllegalArgumentException("Connection ID cannot be null");
+        }
+        if (listingRequestId == null) {
+            throw new IllegalArgumentException("Listing Request ID cannot be 
null");
+        }
+
+        return executeAction("Error retrieving Listing Request", () -> {
+            final WebTarget target = flowFileQueueTarget
+                .path("listing-requests/{requestId}")
+                .resolveTemplate("id", connectionId)
+                .resolveTemplate("requestId", listingRequestId);
+
+            return 
getRequestBuilder(target).delete(ListingRequestEntity.class);
+        });
+    }
+
+    @Override
+    public FlowFileEntity getFlowFile(final String connectionId, final String 
flowFileUuid) throws NiFiClientException, IOException {
+        if (connectionId == null) {
+            throw new IllegalArgumentException("Connection ID cannot be null");
+        }
+        if (flowFileUuid == null) {
+            throw new IllegalArgumentException("FlowFile UUID cannot be null");
+        }
+
+        return executeAction("Error retrieving FlowFile", () -> {
+            final WebTarget target = flowFileQueueTarget
+                .path("flowfiles/{uuid}")
+                .resolveTemplate("id", connectionId)
+                .resolveTemplate("uuid", flowFileUuid);
+
+            return getRequestBuilder(target).get(FlowFileEntity.class);
+        });
+    }
 }

Reply via email to