hunyadi-dev commented on a change in pull request #995:
URL: https://github.com/apache/nifi-minifi-cpp/pull/995#discussion_r572721128



##########
File path: docker/test/integration/MiNiFi_integration_test_driver.py
##########
@@ -0,0 +1,208 @@
+from subprocess import Popen, PIPE, STDOUT
+
+import docker
+import logging
+import os
+import shutil
+import threading
+import time
+import uuid
+
+from pydoc import locate
+
+from minifi.core.InputPort import InputPort
+
+from minifi.core.DockerTestCluster import DockerTestCluster
+from minifi.core.SingleNodeDockerCluster import SingleNodeDockerCluster
+from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
+
+from minifi.validators.EmptyFilesOutPutValidator import 
EmptyFilesOutPutValidator
+from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
+from minifi.validators.SingleFileOutputValidator import 
SingleFileOutputValidator
+
+class MiNiFi_integration_test():
+    def __init__(self, context):
+        logging.info("MiNiFi_integration_test init")
+        self.test_id = str(uuid.uuid4())
+        self.clusters = {}
+
+        self.connectable_nodes = []
+        # Remote process groups are not connectables
+        self.remote_process_groups = []
+        self.file_system_observer = None
+
+        self.docker_network = None
+
+        self.docker_directory_bindings = DockerTestDirectoryBindings()
+        
self.docker_directory_bindings.create_new_data_directories(self.test_id)
+
+    def __del__(self):
+        logging.info("MiNiFi_integration_test cleanup")
+
+        # Clean up network, for some reason only this order of events work for 
cleanup
+        if self.docker_network is not None:
+            logging.info('Cleaning up network network: %s', 
self.docker_network.name)
+            while len(self.docker_network.containers) != 0:
+                for container in self.docker_network.containers:
+                    self.docker_network.disconnect(container, force=True)
+                self.docker_network.reload()
+            self.docker_network.remove()
+
+        container_ids = []
+        for cluster in self.clusters.values():
+            for container in cluster.containers.values():
+                container_ids.append(container.id)
+            del cluster
+
+        # Backup for cleaning up containers as the cluster deleter is not 
reliable
+        docker_client = docker.from_env()
+        for container_id in container_ids:    
+            wait_start_time = time.perf_counter()
+            while (time.perf_counter() - wait_start_time) < 35:
+                # There is no clean way to check for container existence
+                try:
+                    container = docker_client.containers.get(container_id)
+                    logging.error("Failure when trying to clean up containers. 
Attempting secondary cleanup.")
+                    container.kill()
+                    time.sleep(5)
+                    container.remove(v=True, force=True)
+                    time.sleep(5)
+                except docker.errors.NotFound:
+                    break
+            try:
+                container = docker_client.containers.get(container_id)
+                logging.error("All attempts to clean up docker containers were 
unsuccessful.")
+            except docker.errors.NotFound:
+                logging.info("Docker container secondary cleanup successful.")
+                pass
+
+        del self.docker_directory_bindings
+
+    def docker_path_to_local_path(self, docker_path):
+        return 
self.docker_directory_bindings.docker_path_to_local_path(self.test_id, 
docker_path)
+
+    def get_test_id(self):
+        return self.test_id
+
+    def acquire_cluster(self, name):
+        return self.clusters.setdefault(name, DockerTestCluster())
+
+    def set_up_cluster_network(self):
+        self.docker_network = SingleNodeDockerCluster.create_docker_network()
+        for cluster in self.clusters.values():
+            cluster.set_network(self.docker_network)
+
+    def start(self):
+        logging.info("MiNiFi_integration_test start")
+        self.set_up_cluster_network()
+        for cluster in self.clusters.values():
+            
cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
+            cluster.deploy_flow()
+        for cluster_name, cluster in self.clusters.items():
+            startup_success = True
+            logging.info("Engine: %s", cluster.get_engine())
+            if cluster.get_engine() == "minifi-cpp":
+                logging.info("Engine is minifi-cpp")
+                startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller", 120)
+            elif cluster.get_engine() == "nifi":
+                startup_success = cluster.wait_for_app_logs("Starting Flow 
Controller...", 120)
+            elif cluster.get_engine() == "kafka-broker":
+                startup_success = cluster.wait_for_app_logs("Startup 
complete.", 120)
+            if not startup_success:
+                cluster.log_nifi_output()
+            assert startup_success
+
+    def add_node(self, processor):
+        if processor.get_name() in (elem.get_name() for elem in 
self.connectable_nodes):
+            raise Exception("Trying to register processor with an already 
registered name: \"%s\"" % processor.get_name())
+        self.connectable_nodes.append(processor)
+
+    def get_or_create_node_by_name(self, node_name):
+        node = self.get_node_by_name(node_name) 
+        if node == None:

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to