lordgamez commented on a change in pull request #1053:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1053#discussion_r615802510



##########
File path: docker/test/integration/environment.py
##########
@@ -1,26 +1,29 @@
-from behave import fixture, use_fixture
 import sys
 sys.path.append('../minifi')
 import logging
+import datetime
 
 from MiNiFi_integration_test_driver import MiNiFi_integration_test
 from minifi import *
 
 def raise_exception(exception):
-    raise exception
+  raise exception
 
-@fixture
-def test_driver_fixture(context):
-    context.test = MiNiFi_integration_test(context)
-    yield context.test
-    logging.info("Integration test teardown...")
-    del context.test
+def integration_test_cleanup(test):
+  logging.info("Integration test cleanup...")
+  del test
 
 def before_scenario(context, scenario):
-    use_fixture(test_driver_fixture, context)
+  logging.info("Integration test setup at 
{time:%H:%M:%S:%f}".format(time=datetime.datetime.now()))
+  context.test = MiNiFi_integration_test(context)
 
 def after_scenario(context, scenario):
-       pass
+  logging.info("Integration test teardown at 
{time:%H:%M:%S:%f}".format(time=datetime.datetime.now()))
+  if context is not None and hasattr(context, "test"):
+    context.test.cleanup() # force invocation
+    del context.test
+  else:
+    raise Exception("Test")

Review comment:
       Can we give a more elaborate message to this exception?

##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -90,35 +102,56 @@ def acquire_cluster(self, name):
         return self.clusters.setdefault(name, DockerTestCluster())
 
     def set_up_cluster_network(self):
-        self.docker_network = SingleNodeDockerCluster.create_docker_network()
-        for cluster in self.clusters.values():
-            cluster.set_network(self.docker_network)
+        if self.docker_network is None:
+            logging.info("Setting up new network.")
+            self.docker_network = 
SingleNodeDockerCluster.create_docker_network()
+            for cluster in self.clusters.values():
+                cluster.set_network(self.docker_network)
+        else:
+            logging.info("Network is already set.")
+
+    def wait_for_cluster_startup_finish(self, cluster):
+        startup_success = True
+        logging.info("Engine: %s", cluster.get_engine())
+        if cluster.get_engine() == "minifi-cpp":
+            startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller", 120)
+        elif cluster.get_engine() == "nifi":
+            startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller...", 120)
+        elif cluster.get_engine() == "kafka-broker":
+            startup_success = cluster.wait_for_app_logs("Startup complete.", 
120)
+        elif cluster.get_engine() == "http-proxy":
+            startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket 
connections at", 120)
+        elif cluster.get_engine() == "s3-server":
+            startup_success = cluster.wait_for_app_logs("Started 
S3MockApplication", 120)
+        elif cluster.get_engine() == "azure-storage-server":
+            startup_success = cluster.wait_for_app_logs("Azurite Queue service 
is successfully listening at", 120)
+        if not startup_success:
+            cluster.log_nifi_output()
+        return startup_success
+
+    def start_single_cluster(self, cluster_name):
+        self.set_up_cluster_network()
+        cluster = self.clusters[cluster_name]
+        cluster.deploy_flow()
+        assert self.wait_for_cluster_startup_finish(cluster)
+        time.sleep(10)

Review comment:
       Why do we need an additional 10 second delay here?

##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -90,35 +102,56 @@ def acquire_cluster(self, name):
         return self.clusters.setdefault(name, DockerTestCluster())
 
     def set_up_cluster_network(self):
-        self.docker_network = SingleNodeDockerCluster.create_docker_network()
-        for cluster in self.clusters.values():
-            cluster.set_network(self.docker_network)
+        if self.docker_network is None:
+            logging.info("Setting up new network.")
+            self.docker_network = 
SingleNodeDockerCluster.create_docker_network()
+            for cluster in self.clusters.values():
+                cluster.set_network(self.docker_network)
+        else:
+            logging.info("Network is already set.")
+
+    def wait_for_cluster_startup_finish(self, cluster):
+        startup_success = True
+        logging.info("Engine: %s", cluster.get_engine())
+        if cluster.get_engine() == "minifi-cpp":
+            startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller", 120)
+        elif cluster.get_engine() == "nifi":
+            startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller...", 120)
+        elif cluster.get_engine() == "kafka-broker":
+            startup_success = cluster.wait_for_app_logs("Startup complete.", 
120)
+        elif cluster.get_engine() == "http-proxy":
+            startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket 
connections at", 120)
+        elif cluster.get_engine() == "s3-server":
+            startup_success = cluster.wait_for_app_logs("Started 
S3MockApplication", 120)
+        elif cluster.get_engine() == "azure-storage-server":
+            startup_success = cluster.wait_for_app_logs("Azurite Queue service 
is successfully listening at", 120)
+        if not startup_success:
+            cluster.log_nifi_output()
+        return startup_success
+
+    def start_single_cluster(self, cluster_name):
+        self.set_up_cluster_network()
+        cluster = self.clusters[cluster_name]
+        cluster.deploy_flow()
+        assert self.wait_for_cluster_startup_finish(cluster)
+        time.sleep(10)
 
     def start(self):
         logging.info("MiNiFi_integration_test start")
         self.set_up_cluster_network()
         for cluster in self.clusters.values():
-            logging.info("Starting cluster %s with an engine of %s", 
cluster.get_name(), cluster.get_engine())
-            
cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
-            cluster.deploy_flow()
-        for cluster_name, cluster in self.clusters.items():
-            startup_success = True
-            logging.info("Engine: %s", cluster.get_engine())
-            if cluster.get_engine() == "minifi-cpp":
-                startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller", 120)
-            elif cluster.get_engine() == "nifi":
-                startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller...", 120)
-            elif cluster.get_engine() == "kafka-broker":
-                startup_success = cluster.wait_for_app_logs("Startup 
complete.", 120)
-            elif cluster.get_engine() == "http-proxy":
-                startup_success = cluster.wait_for_app_logs("Accepting HTTP 
Socket connections at", 120)
-            elif cluster.get_engine() == "s3-server":
-                startup_success = cluster.wait_for_app_logs("Started 
S3MockApplication", 120)
-            elif cluster.get_engine() == "azure-storage-server":
-                startup_success = cluster.wait_for_app_logs("Azurite Queue 
service is successfully listening at", 120)
-            if not startup_success:
-                cluster.log_nifi_output()
-            assert startup_success
+            if len(cluster.containers) == 0:
+                logging.info("Starting cluster %s with an engine of %s", 
cluster.get_name(), cluster.get_engine())
+                
cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
+                cluster.deploy_flow()
+            else:
+                logging.info("Container %s is already started with an engine 
of %s", cluster.get_name(), cluster.get_engine())
+        for cluster in self.clusters.values():
+            assert self.wait_for_cluster_startup_finish(cluster)
+        # Seems like some extra time needed for consumers to negotiate with 
the broker
+        for cluster in self.clusters.values():
+            if cluster.get_engine() == "kafka-broker":
+                time.sleep(10)

Review comment:
       Is there any log message maybe we could check for?

##########
File path: docker/test/integration/features/kafka.feature
##########
@@ -57,3 +57,179 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
 
     When both instances start up
     Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+
+  Scenario: MiNiFi consumes data from a kafka topic
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected 
to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the 
third-party kafka publisher
+
+    When all instances start up
+    And a message with content "some test message" is published to the 
"ConsumeKafkaTest" topic
+
+    Then at least one flowfile with the content "some test message" is placed 
in the monitored directory in less than 60 seconds
+
+  Scenario Outline: ConsumeKafka parses and uses kafka topics and topic name 
formats
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And the "Topic Names" of the ConsumeKafka processor is set to "<topic 
names>"
+    And the "Topic Name Format" of the ConsumeKafka processor is set to 
"<topic name format>"
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected 
to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the 
third-party kafka publisher
+    And the kafka broker "broker" is started
+    And the topic "ConsumeKafkaTest" is initialized on the kafka broker
+
+    When all other processes start up
+    And a message with content "<message 1>" is published to the 
"ConsumeKafkaTest" topic
+    And a message with content "<message 2>" is published to the 
"ConsumeKafkaTest" topic
+
+    Then two flowfiles with the contents "<message 1>" and "<message 2>" are 
placed in the monitored directory in less than 45 seconds
+
+  Examples: Topic names and formats to test
+    | message 1            | message 2           | topic names              | 
topic name format |
+    | Ulysses              | James Joyce         | ConsumeKafkaTest         | 
(not set)         |
+    | The Great Gatsby     | F. Scott Fitzgerald | ConsumeKafkaTest         | 
Names             |
+    | War and Peace        | Lev Tolstoy         | a,b,c,ConsumeKafkaTest,d | 
Names             |
+    | Nineteen Eighty Four | George Orwell       | ConsumeKafkaTest         | 
Patterns          |
+    | Hamlet               | William Shakespeare | Cons[emu]*KafkaTest      | 
Patterns          |

Review comment:
       It's good to have this table format of inputs, I didn't know about this 
feature before, improves readability :+1: 

##########
File path: docker/test/integration/minifi/validators/FileOutputValidator.py
##########
@@ -1,8 +1,46 @@
+import logging
+import os
+
+from os import listdir
+from os.path import join
+
 from .OutputValidator import OutputValidator
 
 class FileOutputValidator(OutputValidator):
     def set_output_dir(self, output_dir):
         self.output_dir = output_dir
 
+    @staticmethod
+    def num_files_matching_content_in_dir(dir_path, expected_content):
+      listing = listdir(dir_path)

Review comment:
       The number of whitespaces in the new function does not match the old 
code. We should stick to 4 whitespaces as it is the pep8 standard and will be 
enforced after the flake8 check is integrated.

##########
File path: docker/test/integration/minifi/validators/FileOutputValidator.py
##########
@@ -1,8 +1,46 @@
+import logging
+import os
+
+from os import listdir
+from os.path import join
+
 from .OutputValidator import OutputValidator
 
 class FileOutputValidator(OutputValidator):
     def set_output_dir(self, output_dir):
         self.output_dir = output_dir
 
+    @staticmethod
+    def num_files_matching_content_in_dir(dir_path, expected_content):
+      listing = listdir(dir_path)
+      if listing:
+        files_of_matching_content_found = 0
+        for file_name in listing:
+          full_path = join(dir_path, file_name)
+          if not os.path.isfile(full_path):
+            continue
+          with open(full_path, 'r') as out_file:
+            contents = out_file.read()
+            logging.info("dir %s -- name %s", dir_path, file_name)
+            logging.info("expected content: %s -- actual: %s, match: %r", 
expected_content, contents, expected_content == contents)
+            if expected_content in contents:
+              files_of_matching_content_found += 1
+        return files_of_matching_content_found
+      return 0
+
+    @staticmethod
+    def get_num_files(dir_path):
+      listing = listdir(dir_path)
+      logging.info("Num files in %s: %d", dir_path, len(listing))
+      if listing:

Review comment:
       I would return early here as well.

##########
File path: docker/test/integration/minifi/core/SingleNodeDockerCluster.py
##########
@@ -234,22 +237,32 @@ def deploy_kafka_broker(self):
                     detach=True,
                     name='kafka-broker',
                     network=self.network.name,
-                    ports={'9092/tcp': 9092},
-                    
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093",
 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
+                    ports={'9092/tcp': 9092, '29092/tcp' : 29092},
+                    # 
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093",
 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],

Review comment:
       This can be removed

##########
File path: docker/test/integration/minifi/validators/FileOutputValidator.py
##########
@@ -1,8 +1,46 @@
+import logging
+import os
+
+from os import listdir
+from os.path import join
+
 from .OutputValidator import OutputValidator
 
 class FileOutputValidator(OutputValidator):
     def set_output_dir(self, output_dir):
         self.output_dir = output_dir
 
+    @staticmethod
+    def num_files_matching_content_in_dir(dir_path, expected_content):
+      listing = listdir(dir_path)
+      if listing:

Review comment:
       I would switch it up and use `if not listing` here for early return

##########
File path: docker/test/integration/steps/steps.py
##########
@@ -259,9 +368,127 @@ def step_impl(context, content, file_name, path, seconds):
     time.sleep(seconds)
     context.test.add_test_data(path, content, file_name)
 
+@when("a message with content \"{content}\" is published to the 
\"{topic_name}\" topic")
+def step_impl(context, content, topic_name):
+    p = Producer({"bootstrap.servers": "localhost:29092", "client.id": 
socket.gethostname()})
+    def delivery_report(err, msg):
+        if err is not None:
+            logging.info('Message delivery failed: {}'.format(err))
+        else:
+            logging.info('Message delivered to {} [{}]'.format(msg.topic(), 
msg.partition()))

Review comment:
       Could this be extracted and reused? It seems to be the same callback 
redefined in all functions.

##########
File path: docker/test/integration/steps/steps.py
##########
@@ -259,9 +368,127 @@ def step_impl(context, content, file_name, path, seconds):
     time.sleep(seconds)
     context.test.add_test_data(path, content, file_name)
 
+@when("a message with content \"{content}\" is published to the 
\"{topic_name}\" topic")
+def step_impl(context, content, topic_name):
+    p = Producer({"bootstrap.servers": "localhost:29092", "client.id": 
socket.gethostname()})

Review comment:
       Please rename `p` here and below just for easier searchability.

##########
File path: docker/test/integration/steps/steps.py
##########
@@ -248,9 +330,36 @@ def step_impl(context, cluster_name):
     cluster.set_engine("azure-storage-server")
     cluster.set_flow(None)
 
+@given("the kafka broker \"{cluster_name}\" is started")
+def step_impl(context, cluster_name):
+    context.test.start_single_cluster(cluster_name)
+
+@given("the topic \"{topic_name}\" is initialized on the kafka broker")
+def step_impl(context, topic_name):
+    a = AdminClient({'bootstrap.servers': "localhost:29092"})
+    new_topics = [NewTopic(topic_name, num_partitions=1, replication_factor=1)]
+    fs = a.create_topics(new_topics)
+    # Block until the topic is created
+    for topic, f in fs.items():

Review comment:
       Please rename `a` `f` and `fs` variables, it's not trivial what they 
refer to and generally hard to search for occurrences with such short names. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to