This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 9709bd6fb7 NIFI-11737: Improved performance of FlowSynchronizationIT 9709bd6fb7 is described below commit 9709bd6fb749e1a1c7bc52076e69d96f5235802a Author: Mark Payne <marka...@hotmail.com> AuthorDate: Fri Jun 16 14:57:42 2023 -0400 NIFI-11737: Improved performance of FlowSynchronizationIT - FlowSynchronizationIT no longer requires isDestroyEnvironmentAfterEachTest to return true This closes #7420 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../system/clustering/FlowSynchronizationIT.java | 46 ++++++++++++++-------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index aff856f05d..d758529b12 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -96,16 +96,11 @@ public class FlowSynchronizationIT extends NiFiSystemIT { ); } - @Override - protected boolean isDestroyEnvironmentAfterEachTest() { - return true; - } - @Test public void testParameterUpdateWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException { // Add Parameter context with Param1 = 1 - final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("Context1", Collections.singletonMap("Param1", "1")); + final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("testParameterUpdateWhileNodeDisconnected", Collections.singletonMap("Param1", "1")); getClientUtil().setParameterContext("root", parameterContextEntity); // Create a GenerateFlowFile that adds an attribute with name 'attr' and a value that references the parameter @@ -166,7 +161,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT { final ProcessorEntity countEvents3 = getClientUtil().createProcessor("CountEvents"); // Create parameter context with a sensitive parameter and set that on the root group - final ParameterContextEntity paramContext = getClientUtil().createParameterContext("context1", "MyParameter", "Our Secret", true); + final ParameterContextEntity paramContext = getClientUtil().createParameterContext("testSensitivePropertiesInherited", "MyParameter", "Our Secret", true); getClientUtil().setParameterContext("root", paramContext); // Set sensitive property of 1 processor to an explicit value and sensitive property of another to a sensitive parameter. @@ -202,13 +197,14 @@ public class FlowSynchronizationIT extends NiFiSystemIT { @Test public void testComponentsRecreatedOnRejoinCluster() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("testComponentsRecreatedOnRejoinCluster", "root"); // Build dataflow with processors at root level and an inner group that contains an input port, output port, and a processor, as well as a Controller Service that the processor will use. - final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); - final ProcessGroupEntity group = getClientUtil().createProcessGroup("Inner Group", "root"); + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile", topLevel.getId()); + final ProcessGroupEntity group = getClientUtil().createProcessGroup("Inner Group", topLevel.getId()); final PortEntity inPort = getClientUtil().createInputPort("In", group.getId()); final PortEntity outPort = getClientUtil().createOutputPort("Out", group.getId()); final ProcessorEntity count = getClientUtil().createProcessor("CountFlowFiles", group.getId()); - final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", topLevel.getId()); getClientUtil().updateProcessorSchedulingPeriod(generate, "60 sec"); final ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService", group.getId()); @@ -230,7 +226,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT { reportingTaskProperties.put("Text", "${now():toNumber()}"); getClientUtil().updateReportingTaskProperties(reportingTask, reportingTaskProperties); - final ParameterContextEntity context = getClientUtil().createParameterContext("Context1", "abc", "hello", false); + final ParameterContextEntity context = getClientUtil().createParameterContext("testComponentsRecreatedOnRejoinCluster", "abc", "hello", false); // Disconnect Node 2 disconnectNode(2); @@ -260,7 +256,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT { // is on Node 2. switchClientToNode(2); - final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup("root"); + final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup(topLevel.getId()); final FlowDTO flowDto = flow.getProcessGroupFlow().getFlow(); assertEquals(2, flowDto.getConnections().size()); assertEquals(2, flowDto.getProcessors().size()); @@ -395,11 +391,12 @@ public class FlowSynchronizationIT extends NiFiSystemIT { // Wait for node to show as disconnected because it doesn't have the necessary nar waitForNodeState(2, NodeConnectionState.DISCONNECTED); - // We need to either restart Node 2 or remove it from the cluster in order to ensure that we can properly shutdown. - // Reconnecting to the cluster would require restoring the NAR file and restarting, which will take longer than simply removing the - // node from the cluster. So we opt for shutting down the node and removing it from the cluster. + // We need to restore the extensions nar and restart the node so that subsequent tests can succeed + restoreExtensionsNar(node2); node2.stop(); - removeNode(2); + node2.start(); + + waitForAllNodesConnected(); } private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException { @@ -437,6 +434,17 @@ public class FlowSynchronizationIT extends NiFiSystemIT { waitForAllNodesConnected(); assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing()); + + // In order to ensure that subsequent tests are able to operate properly, we need to restore the nar and restart + node1.stop(); + node2.stop(); + + restoreExtensionsNar(node1); + restoreExtensionsNar(node2); + + node1.start(false); + node2.start(true); + waitForAllNodesConnected(); } @@ -511,6 +519,12 @@ public class FlowSynchronizationIT extends NiFiSystemIT { assertTrue(extensionsNar.renameTo(backupFile)); } + private void restoreExtensionsNar(final NiFiInstance nifiInstance) { + final File backupFile = getExtensionsNar(nifiInstance); + final File extensionsNar = new File(backupFile.getParentFile(), backupFile.getName().replace(".backup", "")); + assertTrue(backupFile.renameTo(extensionsNar)); + } + private File getExtensionsNar(final NiFiInstance nifiInstance) { final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib"); final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));