Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
exceptionfactory closed pull request #8063: NIFI-12402 add option to user can avoid an inactive indicator is crea… URL: https://github.com/apache/nifi/pull/8063 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
exceptionfactory commented on code in PR #8063: URL: https://github.com/apache/nifi/pull/8063#discussion_r1454401906 ## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java: ## @@ -349,6 +408,50 @@ public void testFirstRunNoMessages() throws InterruptedException { while(rerun); } +@Test +public void testRunNoMessagesWithWaitForActivityTrue() throws InterruptedException { +// don't use the TestableProcessor, we want the real timestamp from @OnScheduled +final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); +runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); +runner.setProperty(MonitorActivity.THRESHOLD, "5 secs"); +runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true"); + +// shouldn't generate inactivity b/c run() will reset the lastSuccessfulTransfer if @OnSchedule & onTrigger +// does not get called more than MonitorActivity.THRESHOLD apart +runner.run(); +runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); +runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); +runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); +Thread.sleep(1L); +runNext(runner); +runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); +runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); +runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); +runner.clearTransferState(); +} + +@Test +public void testRunNoMessagesWithWaitForActivityFalse() throws InterruptedException { +// don't use the TestableProcessor, we want the real timestamp from @OnScheduled +final TestRunner runner = TestRunners.newTestRunner(new MonitorActivity()); +runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); +runner.setProperty(MonitorActivity.THRESHOLD, "5 secs"); +runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "false"); + +// shouldn't generate inactivity b/c run() will reset the lastSuccessfulTransfer if @OnSchedule & onTrigger +// does not get called more than MonitorActivity.THRESHOLD apart +runner.run(); +runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); +runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); +runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); +Thread.sleep(1L); +runNext(runner); +runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); +runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1); +runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); +runner.clearTransferState(); +} + Review Comment: The sleep of 10 seconds is too long for efficient unit testing. The other unit test method provides some basic coverage, so recommend removing these two test methods. ## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java: ## @@ -100,6 +100,65 @@ public void testFirstMessage() { restoredFlowFile.assertAttributeNotExists("key1"); } +@Test +public void testFirstMessageWithWaitForActivityTrue() { +final TestableProcessor processor = new TestableProcessor(1000); +final TestRunner runner = TestRunners.newTestRunner(processor); +runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); +runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); +runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true"); + +runner.enqueue(new byte[0]); +runner.run(); +runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1); +runner.clearTransferState(); + +processor.resetLastSuccessfulTransfer(); + +runNext(runner); +runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1); +runner.clearTransferState(); + +Map attributes = new HashMap<>(); +attributes.put("key", "value"); +attributes.put("key1", "value1"); + +runner.enqueue(new byte[0], attributes); +runNext(runner); + +runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1); +runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1); + +MockFlowFile restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0); +String flowFileContent = new String(restoredFlowFile.toByteArray()); +assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent)); Review Comment: It is best to avoid checking particular messages, as these are not necessarily
Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
exceptionfactory commented on code in PR #8063: URL: https://github.com/apache/nifi/pull/8063#discussion_r1452722401 ## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java: ## @@ -105,6 +105,14 @@ public class MonitorActivity extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("Activity restored at time: ${now():format('/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(6)} minutes") .build(); +public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder() +.name("Wait For Activity") Review Comment: Following the general Title Case convention for property names, recommend adjusting `For` to `for`. ```suggestion .name("Wait for Activity") ``` ## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java: ## @@ -105,6 +105,14 @@ public class MonitorActivity extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("Activity restored at time: ${now():format('/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(6)} minutes") .build(); +public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder() +.name("Wait For Activity") +.description("When the processor gets started or restarted, if set to true, only send an inactive indicator if there had been activity beforehand. " ++ "Otherwise send an inactive indicator even if there had not been activity beforehand.") +.required(false) Review Comment: Since this property has a default value that preserves the existing behavior, this new property can be marked as required. ```suggestion .required(true) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
dan-s1 commented on PR #8063: URL: https://github.com/apache/nifi/pull/8063#issuecomment-1865110606 LGTM +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
dan-s1 commented on code in PR #8063: URL: https://github.com/apache/nifi/pull/8063#discussion_r1431617697 ## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java: ## @@ -105,6 +105,14 @@ public class MonitorActivity extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("Activity restored at time: ${now():format('/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(6)} minutes") .build(); +public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder() +.name("Initial Inactivity Indicator") +.description("When the processor gets started or restarted, it is considered there was no activity happened before. " ++ "If true, send inactivity indicator only if there was activity.") Review Comment: ```suggestion .description("When the processor gets started or restarted, if set to true, only send an inactive indicator if there had been activity beforehand. Otherwise send an inactive indicator even if there had not been activity beforehand.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
nathluu commented on PR #8063: URL: https://github.com/apache/nifi/pull/8063#issuecomment-1858787407 Hi @dan-s1, Codes were updated. Please help to review it again. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-12402 add option to user can avoid an inactive indicator is crea… [nifi]
dan-s1 commented on code in PR #8063: URL: https://github.com/apache/nifi/pull/8063#discussion_r1428029603 ## nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java: ## @@ -300,7 +312,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (isInactive) { final boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean(); -sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis)); +if (!initialInactivityIndicator) { +sendInactiveMarker = (!inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis))) && hasSuccessTransfer.get(); +} else { +sendInactiveMarker = !inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get() + thresholdMillis)); +} Review Comment: `!inactive.getAndSet(true) || (continual && (now > lastInactiveMessage.get()` is common code between lines 316 and 318. Can you please assign this to a variable whose name should describe what these conditions are and then use the variable on lines 316 and 318? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org