This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 8e3f420 NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration 8e3f420 is described below commit 8e3f42051fb3c7da27873de8b6d4507827a7b80d Author: EndzeitBegins <16666115+endzeitbeg...@users.noreply.github.com> AuthorDate: Mon Apr 13 16:19:55 2020 +0200 NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration This closes #4201. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../org/apache/nifi/processors/standard/Wait.java | 4 +- .../apache/nifi/processors/standard/TestWait.java | 62 ++++++++++++---------- 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index 45ffcb2..37f4479 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -87,7 +87,7 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE @WritesAttributes({ @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the " + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. " - + "This attribute is not written when the FlowFile is transferred to failure or success"), + + "This attribute is not written when the FlowFile is transferred to failure, expired or success"), @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, " + "each count value in the signal is copied.") }) @@ -375,7 +375,7 @@ public class Wait extends AbstractProcessor { final Relationship finalRelationship = relationship; final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() .map(f -> { - if (REL_SUCCESS.equals(finalRelationship)) { + if (REL_SUCCESS.equals(finalRelationship) || REL_EXPIRED.equals(finalRelationship)) { // These flowFiles will be exiting the wait, clear the timer f = clearWaitState(session, f); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index 7970601..d8fffdc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -69,8 +69,8 @@ public class TestWait { // no cache key attribute runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); - // timestamp must be present - runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set runner.clearTransferState(); } @@ -103,7 +103,7 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set runner.clearTransferState(); runner.enqueue(ff); @@ -112,6 +112,8 @@ public class TestWait { runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1); + ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0); + ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared runner.clearTransferState(); } @@ -129,7 +131,7 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set runner.clearTransferState(); runner.enqueue(ff); @@ -145,6 +147,7 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1); ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0); + ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared // Even if wait didn't complete, signal attributes should be set ff.assertAttributeEquals("wait.counter.total", "3"); ff.assertAttributeEquals("wait.counter.counter-A", "1"); @@ -161,13 +164,14 @@ public class TestWait { final Map<String, String> props = new HashMap<>(); props.put("releaseSignalAttribute", "1"); - props.put("wait.start.timestamp", "blue bunny"); + props.put(Wait.WAIT_START_TIMESTAMP, "blue bunny"); runner.enqueue(new byte[]{}, props); runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); - runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); + MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0); + ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared runner.clearTransferState(); } @@ -181,8 +185,10 @@ public class TestWait { runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); - runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total"); - runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); + MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0); + ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared + ff.assertAttributeNotExists("wait.counter.total"); + runner.clearTransferState(); } @@ -217,6 +223,8 @@ public class TestWait { runner.run(1, false); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set runner.clearTransferState(); // The signal id should be penalized @@ -264,9 +272,8 @@ public class TestWait { runner.assertTransferCount(Wait.REL_SUCCESS, 1); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared - // timer cleared - outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that the original attributes are still there @@ -307,9 +314,8 @@ public class TestWait { runner.assertTransferCount(Wait.REL_SUCCESS, 1); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared - // timer cleared - outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that the original attributes are still there @@ -348,7 +354,9 @@ public class TestWait { runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set + String initialTimestamp = waitingFlowFile.getAttribute(Wait.WAIT_START_TIMESTAMP); + /* * 2nd iteration. */ @@ -362,7 +370,8 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since total count doesn't reach to 3. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set + waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant /* * 3rd iteration. @@ -374,7 +383,8 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since total count doesn't reach to 3. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set + waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant /* * 4th iteration. @@ -389,9 +399,7 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); - - // wait timer cleared - outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); @@ -434,7 +442,8 @@ public class TestWait { runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set + String initialTimestamp = waitingFlowFile.getAttribute(Wait.WAIT_START_TIMESTAMP); /* * 2nd iteration. @@ -449,7 +458,8 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since counter-B doesn't reach to 2. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set + waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant /* * 3rd iteration. @@ -464,7 +474,8 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since total count doesn't reach to 3. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set + waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant /* * 4th iteration. @@ -479,9 +490,8 @@ public class TestWait { runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared - // wait timer cleared - outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that the original attributes are still there @@ -534,8 +544,7 @@ public class TestWait { runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); - // timer cleared - outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared outputFlowFile.assertAttributeEquals("wait.counter.counter", "2"); // expect counter to be decremented to 0 and releasable count remains 1. @@ -552,8 +561,7 @@ public class TestWait { runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); - // timer cleared - outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared // All counters are consumed. outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");