fgerlits commented on a change in pull request #1172:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1172#discussion_r712841843



##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -196,3 +196,6 @@ def check_minifi_log_contents(self, line, 
timeout_seconds=60):
 
     def check_query_results(self, postgresql_container_name, query, 
number_of_rows, timeout_seconds):
         assert self.cluster.check_query_results(postgresql_container_name, 
query, number_of_rows, timeout_seconds)
+
+    def check_mosquitto_logs(self, broker_name, log_pattern, timeout_seconds, 
count=1, use_regex=False):

Review comment:
       This is not really specific to mosquitto; I would rename it to something 
like "wait_for_container_logs" (and rename "broker_name" to "container_name").

##########
File path: docker/test/integration/features/mqtt.feature
##########
@@ -0,0 +1,66 @@
+Feature: Sending data to using MQTT streaming platform using PublishMQTT
+  In order to send and receive data via MQTT
+  As a user of MiNiFi
+  I need to have PublishMQTT and ConsumeMQTT processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to an MQTT broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the MQTT broker has 1 log line matching "Received PUBLISH from 
.*testtopic.*\\(4 bytes\\)"
+
+  Scenario: A MiNiFi instance tries to transfer data to a non-existent MQTT 
broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    When the MiNiFi instance starts up
+    Then no files are placed in the monitored directory in 30 seconds of 
running time
+
+  Scenario: Verify delivery of message when MQTT broker is unstable
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    When the MiNiFi instance starts up
+
+    Then no files are placed in the monitored directory in 30 seconds of 
running time
+    And an MQTT broker is deployed in correspondence with the PublishMQTT
+    And a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the MQTT broker has 1 log line matching "Received PUBLISH from 
.*testtopic.*\\(4 bytes\\)"
+
+  Scenario: A MiNiFi instance publishes and consumes data to/from an MQTT 
broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+
+    And a ConsumeMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And "ConsumeMQTT" processor is a start node
+    And the "success" relationship of the ConsumeMQTT processor is connected 
to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT and 
ConsumeMQTT
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the MQTT broker has 1 log line matching "Received PUBLISH from 
.*testtopic.*\\(4 bytes\\)"
+    And the MQTT broker has 1 log line matching "Received SUBSCRIBE from"

Review comment:
       missing newline at the end

##########
File path: docker/test/integration/features/mqtt.feature
##########
@@ -0,0 +1,66 @@
+Feature: Sending data to using MQTT streaming platform using PublishMQTT
+  In order to send and receive data via MQTT
+  As a user of MiNiFi
+  I need to have PublishMQTT and ConsumeMQTT processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to an MQTT broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the MQTT broker has 1 log line matching "Received PUBLISH from 
.*testtopic.*\\(4 bytes\\)"
+
+  Scenario: A MiNiFi instance tries to transfer data to a non-existent MQTT 
broker

Review comment:
       Is the test file not routed to failure because PublishMQTT will throw in 
`onSchedule()`?  If so, then the name of this scenario could be better, eg. "If 
the MQTT broker is not available, the PublishMQTT processor will not start".

##########
File path: docker/test/integration/features/mqtt.feature
##########
@@ -0,0 +1,66 @@
+Feature: Sending data to using MQTT streaming platform using PublishMQTT
+  In order to send and receive data via MQTT
+  As a user of MiNiFi
+  I need to have PublishMQTT and ConsumeMQTT processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to an MQTT broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    And an MQTT broker is set up in correspondence with the PublishMQTT
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+    And the MQTT broker has 1 log line matching "Received PUBLISH from 
.*testtopic.*\\(4 bytes\\)"
+
+  Scenario: A MiNiFi instance tries to transfer data to a non-existent MQTT 
broker
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+    And the "failure" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    When the MiNiFi instance starts up
+    Then no files are placed in the monitored directory in 30 seconds of 
running time
+
+  Scenario: Verify delivery of message when MQTT broker is unstable
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishMQTT processor set up to communicate with an MQTT broker 
instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishMQTT
+    And the "success" relationship of the PublishMQTT processor is connected 
to the PutFile
+
+    When the MiNiFi instance starts up
+
+    Then no files are placed in the monitored directory in 30 seconds of 
running time
+    And an MQTT broker is deployed in correspondence with the PublishMQTT

Review comment:
       moving the blank line may make this test clearer:
   ```suggestion
       When the MiNiFi instance starts up
       Then no files are placed in the monitored directory in 30 seconds of 
running time
   
       And an MQTT broker is deployed in correspondence with the PublishMQTT
   ```

##########
File path: docker/test/integration/minifi/core/DockerTestCluster.py
##########
@@ -54,13 +55,16 @@ def get_app_log(self, container_name):
 
         return container.status, None
 
-    def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, 
count=1):
+    def wait_for_app_logs(self, container_name, log_entry, timeout_seconds, 
count=1, use_regex=False):
         wait_start_time = time.perf_counter()
         while (time.perf_counter() - wait_start_time) < timeout_seconds:
             logging.info('Waiting for app-logs `%s` in container `%s`', 
log_entry, container_name)
             status, logs = self.get_app_log(container_name)
-            if logs is not None and count <= 
logs.decode("utf-8").count(log_entry):
-                return True
+            if logs is not None:
+                if not use_regex and count <= 
logs.decode("utf-8").count(log_entry):
+                    return True
+                elif use_regex and len(re.findall(log_entry, 
logs.decode("utf-8"))) <= count:

Review comment:
       this should be `>=`
   
   (I would also change the non-regex case from `count <= something` to 
`something >= count` as I think that would read better)

##########
File path: docker/test/integration/steps/steps.py
##########
@@ -546,3 +554,14 @@ def step_impl(context, query, number_of_rows, 
timeout_seconds):
 @then("the Minifi logs contain the following message: \"{log_message}\" in 
less than {duration}")
 def step_impl(context, log_message, duration):
     context.test.check_minifi_log_contents(log_message, timeparse(duration))
+
+# MQTT
+@then("the MQTT broker has 1 log line matching \"{log_pattern}\"")

Review comment:
       this sounds like the test will only pass if there is exactly 1 matching 
log line, but in fact we check for at least 1 matching log line
   ```suggestion
   @then("the MQTT broker has a log line matching \"{log_pattern}\"")
   ```

##########
File path: docker/test/integration/minifi/core/ImageStore.py
##########
@@ -12,6 +12,7 @@ class ImageStore:
     def __init__(self):
         self.client = docker.from_env()
         self.images = dict()
+        self.test_dir = os.environ['PYTHONPATH'].split(':')[-1]  # Based on 
DockerVerify.sh

Review comment:
       This looks brittle, as someone may add another directory to the end of 
`PYTHONPATH` in the future.  It would be better if eg. DockerVerify.sh exported 
a dedicated `TEST_DIRECTORY` environment variable which we could read here.
   
   EDIT: I can see this same code is already used in a few other places; I 
would change those, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to