fgerlits commented on a change in pull request #1172: URL: https://github.com/apache/nifi-minifi-cpp/pull/1172#discussion_r712841843
########## File path: docker/test/integration/MiNiFi_integration_test_driver.py ########## @@ -196,3 +196,6 @@ def check_minifi_log_contents(self, line, timeout_seconds=60): def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds): assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds) + + def check_mosquitto_logs(self, broker_name, log_pattern, timeout_seconds, count=1, use_regex=False): Review comment: This is not really specific to mosquitto; I would rename it to something like "wait_for_container_logs" (and rename "broker_name" to "container_name"). ########## File path: docker/test/integration/features/mqtt.feature ########## @@ -0,0 +1,66 @@ +Feature: Sending data to using MQTT streaming platform using PublishMQTT + In order to send and receive data via MQTT + As a user of MiNiFi + I need to have PublishMQTT and ConsumeMQTT processors + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: A MiNiFi instance transfers data to an MQTT broker + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + + And an MQTT broker is set up in correspondence with the PublishMQTT + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And the MQTT broker has 1 log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)" + + Scenario: A MiNiFi instance tries to transfer data to a non-existent MQTT broker + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + And the "failure" relationship of the PublishMQTT processor is connected to the PutFile + + When the MiNiFi instance starts up + Then no files are placed in the monitored directory in 30 seconds of running time + + Scenario: Verify delivery of message when MQTT broker is unstable + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + + When the MiNiFi instance starts up + + Then no files are placed in the monitored directory in 30 seconds of running time + And an MQTT broker is deployed in correspondence with the PublishMQTT + And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And the MQTT broker has 1 log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)" + + Scenario: A MiNiFi instance publishes and consumes data to/from an MQTT broker + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + + And a ConsumeMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And "ConsumeMQTT" processor is a start node + And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + + And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And the MQTT broker has 1 log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)" + And the MQTT broker has 1 log line matching "Received SUBSCRIBE from" Review comment: missing newline at the end ########## File path: docker/test/integration/features/mqtt.feature ########## @@ -0,0 +1,66 @@ +Feature: Sending data to using MQTT streaming platform using PublishMQTT + In order to send and receive data via MQTT + As a user of MiNiFi + I need to have PublishMQTT and ConsumeMQTT processors + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: A MiNiFi instance transfers data to an MQTT broker + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + + And an MQTT broker is set up in correspondence with the PublishMQTT + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And the MQTT broker has 1 log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)" + + Scenario: A MiNiFi instance tries to transfer data to a non-existent MQTT broker Review comment: Is the test file not routed to failure because PublishMQTT will throw in `onSchedule()`? If so, then the name of this scenario could be better, eg. "If the MQTT broker is not available, the PublishMQTT processor will not start". ########## File path: docker/test/integration/features/mqtt.feature ########## @@ -0,0 +1,66 @@ +Feature: Sending data to using MQTT streaming platform using PublishMQTT + In order to send and receive data via MQTT + As a user of MiNiFi + I need to have PublishMQTT and ConsumeMQTT processors + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: A MiNiFi instance transfers data to an MQTT broker + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + + And an MQTT broker is set up in correspondence with the PublishMQTT + + When both instances start up + Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + And the MQTT broker has 1 log line matching "Received PUBLISH from .*testtopic.*\\(4 bytes\\)" + + Scenario: A MiNiFi instance tries to transfer data to a non-existent MQTT broker + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + And the "failure" relationship of the PublishMQTT processor is connected to the PutFile + + When the MiNiFi instance starts up + Then no files are placed in the monitored directory in 30 seconds of running time + + Scenario: Verify delivery of message when MQTT broker is unstable + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "test" is present in "/tmp/input" + And a PublishMQTT processor set up to communicate with an MQTT broker instance + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + And the "success" relationship of the PublishMQTT processor is connected to the PutFile + + When the MiNiFi instance starts up + + Then no files are placed in the monitored directory in 30 seconds of running time + And an MQTT broker is deployed in correspondence with the PublishMQTT Review comment: moving the blank line may make this test clearer: ```suggestion When the MiNiFi instance starts up Then no files are placed in the monitored directory in 30 seconds of running time And an MQTT broker is deployed in correspondence with the PublishMQTT ``` ########## File path: docker/test/integration/minifi/core/DockerTestCluster.py ########## @@ -54,13 +55,16 @@ def get_app_log(self, container_name): return container.status, None - def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1): + def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, count=1, use_regex=False): wait_start_time = time.perf_counter() while (time.perf_counter() - wait_start_time) < timeout_seconds: logging.info('Waiting for app-logs `%s` in container `%s`', log_entry, container_name) status, logs = self.get_app_log(container_name) - if logs is not None and count <= logs.decode("utf-8").count(log_entry): - return True + if logs is not None: + if not use_regex and count <= logs.decode("utf-8").count(log_entry): + return True + elif use_regex and len(re.findall(log_entry, logs.decode("utf-8"))) <= count: Review comment: this should be `>=` (I would also change the non-regex case from `count <= something` to `something >= count` as I think that would read better) ########## File path: docker/test/integration/steps/steps.py ########## @@ -546,3 +554,14 @@ def step_impl(context, query, number_of_rows, timeout_seconds): @then("the Minifi logs contain the following message: \"{log_message}\" in less than {duration}") def step_impl(context, log_message, duration): context.test.check_minifi_log_contents(log_message, timeparse(duration)) + +# MQTT +@then("the MQTT broker has 1 log line matching \"{log_pattern}\"") Review comment: this sounds like the test will only pass if there is exactly 1 matching log line, but in fact we check for at least 1 matching log line ```suggestion @then("the MQTT broker has a log line matching \"{log_pattern}\"") ``` ########## File path: docker/test/integration/minifi/core/ImageStore.py ########## @@ -12,6 +12,7 @@ class ImageStore: def __init__(self): self.client = docker.from_env() self.images = dict() + self.test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh Review comment: This looks brittle, as someone may add another directory to the end of `PYTHONPATH` in the future. It would be better if eg. DockerVerify.sh exported a dedicated `TEST_DIRECTORY` environment variable which we could read here. EDIT: I can see this same code is already used in a few other places; I would change those, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org