lordgamez commented on a change in pull request #1053: URL: https://github.com/apache/nifi-minifi-cpp/pull/1053#discussion_r615802510
########## File path: docker/test/integration/environment.py ########## @@ -1,26 +1,29 @@ -from behave import fixture, use_fixture import sys sys.path.append('../minifi') import logging +import datetime from MiNiFi_integration_test_driver import MiNiFi_integration_test from minifi import * def raise_exception(exception): - raise exception + raise exception -@fixture -def test_driver_fixture(context): - context.test = MiNiFi_integration_test(context) - yield context.test - logging.info("Integration test teardown...") - del context.test +def integration_test_cleanup(test): + logging.info("Integration test cleanup...") + del test def before_scenario(context, scenario): - use_fixture(test_driver_fixture, context) + logging.info("Integration test setup at {time:%H:%M:%S:%f}".format(time=datetime.datetime.now())) + context.test = MiNiFi_integration_test(context) def after_scenario(context, scenario): - pass + logging.info("Integration test teardown at {time:%H:%M:%S:%f}".format(time=datetime.datetime.now())) + if context is not None and hasattr(context, "test"): + context.test.cleanup() # force invocation + del context.test + else: + raise Exception("Test") Review comment: Can we give a more elaborate message to this exception? ########## File path: docker/test/integration/MiNiFi_integration_test_driver.py ########## @@ -90,35 +102,56 @@ def acquire_cluster(self, name): return self.clusters.setdefault(name, DockerTestCluster()) def set_up_cluster_network(self): - self.docker_network = SingleNodeDockerCluster.create_docker_network() - for cluster in self.clusters.values(): - cluster.set_network(self.docker_network) + if self.docker_network is None: + logging.info("Setting up new network.") + self.docker_network = SingleNodeDockerCluster.create_docker_network() + for cluster in self.clusters.values(): + cluster.set_network(self.docker_network) + else: + logging.info("Network is already set.") + + def wait_for_cluster_startup_finish(self, cluster): + startup_success = True + logging.info("Engine: %s", cluster.get_engine()) + if cluster.get_engine() == "minifi-cpp": + startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120) + elif cluster.get_engine() == "nifi": + startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120) + elif cluster.get_engine() == "kafka-broker": + startup_success = cluster.wait_for_app_logs("Startup complete.", 120) + elif cluster.get_engine() == "http-proxy": + startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket connections at", 120) + elif cluster.get_engine() == "s3-server": + startup_success = cluster.wait_for_app_logs("Started S3MockApplication", 120) + elif cluster.get_engine() == "azure-storage-server": + startup_success = cluster.wait_for_app_logs("Azurite Queue service is successfully listening at", 120) + if not startup_success: + cluster.log_nifi_output() + return startup_success + + def start_single_cluster(self, cluster_name): + self.set_up_cluster_network() + cluster = self.clusters[cluster_name] + cluster.deploy_flow() + assert self.wait_for_cluster_startup_finish(cluster) + time.sleep(10) Review comment: Why do we need an additional 10 second delay here? ########## File path: docker/test/integration/MiNiFi_integration_test_driver.py ########## @@ -90,35 +102,56 @@ def acquire_cluster(self, name): return self.clusters.setdefault(name, DockerTestCluster()) def set_up_cluster_network(self): - self.docker_network = SingleNodeDockerCluster.create_docker_network() - for cluster in self.clusters.values(): - cluster.set_network(self.docker_network) + if self.docker_network is None: + logging.info("Setting up new network.") + self.docker_network = SingleNodeDockerCluster.create_docker_network() + for cluster in self.clusters.values(): + cluster.set_network(self.docker_network) + else: + logging.info("Network is already set.") + + def wait_for_cluster_startup_finish(self, cluster): + startup_success = True + logging.info("Engine: %s", cluster.get_engine()) + if cluster.get_engine() == "minifi-cpp": + startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120) + elif cluster.get_engine() == "nifi": + startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120) + elif cluster.get_engine() == "kafka-broker": + startup_success = cluster.wait_for_app_logs("Startup complete.", 120) + elif cluster.get_engine() == "http-proxy": + startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket connections at", 120) + elif cluster.get_engine() == "s3-server": + startup_success = cluster.wait_for_app_logs("Started S3MockApplication", 120) + elif cluster.get_engine() == "azure-storage-server": + startup_success = cluster.wait_for_app_logs("Azurite Queue service is successfully listening at", 120) + if not startup_success: + cluster.log_nifi_output() + return startup_success + + def start_single_cluster(self, cluster_name): + self.set_up_cluster_network() + cluster = self.clusters[cluster_name] + cluster.deploy_flow() + assert self.wait_for_cluster_startup_finish(cluster) + time.sleep(10) def start(self): logging.info("MiNiFi_integration_test start") self.set_up_cluster_network() for cluster in self.clusters.values(): - logging.info("Starting cluster %s with an engine of %s", cluster.get_name(), cluster.get_engine()) - cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id)) - cluster.deploy_flow() - for cluster_name, cluster in self.clusters.items(): - startup_success = True - logging.info("Engine: %s", cluster.get_engine()) - if cluster.get_engine() == "minifi-cpp": - startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120) - elif cluster.get_engine() == "nifi": - startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120) - elif cluster.get_engine() == "kafka-broker": - startup_success = cluster.wait_for_app_logs("Startup complete.", 120) - elif cluster.get_engine() == "http-proxy": - startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket connections at", 120) - elif cluster.get_engine() == "s3-server": - startup_success = cluster.wait_for_app_logs("Started S3MockApplication", 120) - elif cluster.get_engine() == "azure-storage-server": - startup_success = cluster.wait_for_app_logs("Azurite Queue service is successfully listening at", 120) - if not startup_success: - cluster.log_nifi_output() - assert startup_success + if len(cluster.containers) == 0: + logging.info("Starting cluster %s with an engine of %s", cluster.get_name(), cluster.get_engine()) + cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id)) + cluster.deploy_flow() + else: + logging.info("Container %s is already started with an engine of %s", cluster.get_name(), cluster.get_engine()) + for cluster in self.clusters.values(): + assert self.wait_for_cluster_startup_finish(cluster) + # Seems like some extra time needed for consumers to negotiate with the broker + for cluster in self.clusters.values(): + if cluster.get_engine() == "kafka-broker": + time.sleep(10) Review comment: Is there any log message maybe we could check for? ########## File path: docker/test/integration/features/kafka.feature ########## @@ -57,3 +57,179 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka When both instances start up Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + + Scenario: MiNiFi consumes data from a kafka topic + Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow + And the "success" relationship of the ConsumeKafka processor is connected to the PutFile + + And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher + + When all instances start up + And a message with content "some test message" is published to the "ConsumeKafkaTest" topic + + Then at least one flowfile with the content "some test message" is placed in the monitored directory in less than 60 seconds + + Scenario Outline: ConsumeKafka parses and uses kafka topics and topic name formats + Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow + And the "Topic Names" of the ConsumeKafka processor is set to "<topic names>" + And the "Topic Name Format" of the ConsumeKafka processor is set to "<topic name format>" + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow + And the "success" relationship of the ConsumeKafka processor is connected to the PutFile + + And a kafka broker "broker" is set up in correspondence with the third-party kafka publisher + And the kafka broker "broker" is started + And the topic "ConsumeKafkaTest" is initialized on the kafka broker + + When all other processes start up + And a message with content "<message 1>" is published to the "ConsumeKafkaTest" topic + And a message with content "<message 2>" is published to the "ConsumeKafkaTest" topic + + Then two flowfiles with the contents "<message 1>" and "<message 2>" are placed in the monitored directory in less than 45 seconds + + Examples: Topic names and formats to test + | message 1 | message 2 | topic names | topic name format | + | Ulysses | James Joyce | ConsumeKafkaTest | (not set) | + | The Great Gatsby | F. Scott Fitzgerald | ConsumeKafkaTest | Names | + | War and Peace | Lev Tolstoy | a,b,c,ConsumeKafkaTest,d | Names | + | Nineteen Eighty Four | George Orwell | ConsumeKafkaTest | Patterns | + | Hamlet | William Shakespeare | Cons[emu]*KafkaTest | Patterns | Review comment: It's good to have this table format of inputs, I didn't know about this feature before, improves readability :+1: ########## File path: docker/test/integration/minifi/validators/FileOutputValidator.py ########## @@ -1,8 +1,46 @@ +import logging +import os + +from os import listdir +from os.path import join + from .OutputValidator import OutputValidator class FileOutputValidator(OutputValidator): def set_output_dir(self, output_dir): self.output_dir = output_dir + @staticmethod + def num_files_matching_content_in_dir(dir_path, expected_content): + listing = listdir(dir_path) Review comment: The number of whitespaces in the new function does not match the old code. We should stick to 4 whitespaces as it is the pep8 standard and will be enforced after the flake8 check is integrated. ########## File path: docker/test/integration/minifi/validators/FileOutputValidator.py ########## @@ -1,8 +1,46 @@ +import logging +import os + +from os import listdir +from os.path import join + from .OutputValidator import OutputValidator class FileOutputValidator(OutputValidator): def set_output_dir(self, output_dir): self.output_dir = output_dir + @staticmethod + def num_files_matching_content_in_dir(dir_path, expected_content): + listing = listdir(dir_path) + if listing: + files_of_matching_content_found = 0 + for file_name in listing: + full_path = join(dir_path, file_name) + if not os.path.isfile(full_path): + continue + with open(full_path, 'r') as out_file: + contents = out_file.read() + logging.info("dir %s -- name %s", dir_path, file_name) + logging.info("expected content: %s -- actual: %s, match: %r", expected_content, contents, expected_content == contents) + if expected_content in contents: + files_of_matching_content_found += 1 + return files_of_matching_content_found + return 0 + + @staticmethod + def get_num_files(dir_path): + listing = listdir(dir_path) + logging.info("Num files in %s: %d", dir_path, len(listing)) + if listing: Review comment: I would return early here as well. ########## File path: docker/test/integration/minifi/core/SingleNodeDockerCluster.py ########## @@ -234,22 +237,32 @@ def deploy_kafka_broker(self): detach=True, name='kafka-broker', network=self.network.name, - ports={'9092/tcp': 9092}, - environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"], + ports={'9092/tcp': 9092, '29092/tcp' : 29092}, + # environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"], Review comment: This can be removed ########## File path: docker/test/integration/minifi/validators/FileOutputValidator.py ########## @@ -1,8 +1,46 @@ +import logging +import os + +from os import listdir +from os.path import join + from .OutputValidator import OutputValidator class FileOutputValidator(OutputValidator): def set_output_dir(self, output_dir): self.output_dir = output_dir + @staticmethod + def num_files_matching_content_in_dir(dir_path, expected_content): + listing = listdir(dir_path) + if listing: Review comment: I would switch it up and use `if not listing` here for early return ########## File path: docker/test/integration/steps/steps.py ########## @@ -259,9 +368,127 @@ def step_impl(context, content, file_name, path, seconds): time.sleep(seconds) context.test.add_test_data(path, content, file_name) +@when("a message with content \"{content}\" is published to the \"{topic_name}\" topic") +def step_impl(context, content, topic_name): + p = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()}) + def delivery_report(err, msg): + if err is not None: + logging.info('Message delivery failed: {}'.format(err)) + else: + logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) Review comment: Could this be extracted and reused? It seems to be the same callback redefined in all functions. ########## File path: docker/test/integration/steps/steps.py ########## @@ -259,9 +368,127 @@ def step_impl(context, content, file_name, path, seconds): time.sleep(seconds) context.test.add_test_data(path, content, file_name) +@when("a message with content \"{content}\" is published to the \"{topic_name}\" topic") +def step_impl(context, content, topic_name): + p = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()}) Review comment: Please rename `p` here and below just for easier searchability. ########## File path: docker/test/integration/steps/steps.py ########## @@ -248,9 +330,36 @@ def step_impl(context, cluster_name): cluster.set_engine("azure-storage-server") cluster.set_flow(None) +@given("the kafka broker \"{cluster_name}\" is started") +def step_impl(context, cluster_name): + context.test.start_single_cluster(cluster_name) + +@given("the topic \"{topic_name}\" is initialized on the kafka broker") +def step_impl(context, topic_name): + a = AdminClient({'bootstrap.servers': "localhost:29092"}) + new_topics = [NewTopic(topic_name, num_partitions=1, replication_factor=1)] + fs = a.create_topics(new_topics) + # Block until the topic is created + for topic, f in fs.items(): Review comment: Please rename `a` `f` and `fs` variables, it's not trivial what they refer to and generally hard to search for occurrences with such short names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org