This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push: new ce3d1f8 MINIFICPP-1445 - Refactor docker integration test frame ce3d1f8 is described below commit ce3d1f8bdc2d2236f471ee960ab2ee05c1fe927f Author: Adam Hunyadi <hunyadi....@gmail.com> AuthorDate: Mon Jan 4 17:18:10 2021 +0100 MINIFICPP-1445 - Refactor docker integration test frame Signed-off-by: Arpad Boda <ab...@apache.org> This closes #968 --- .gitignore | 1 + docker/test/integration/minifi/__init__.py | 1105 +------------------- docker/test/integration/minifi/core/Cluster.py | 22 + docker/test/integration/minifi/core/Connectable.py | 77 ++ .../integration/minifi/core/ControllerService.py | 17 + .../__init__.py => core/DockerTestCluster.py} | 178 +--- docker/test/integration/minifi/core/InputPort.py | 7 + .../integration/minifi/core/OutputEventHandler.py | 24 + docker/test/integration/minifi/core/Processor.py | 45 + .../integration/minifi/core/RemoteProcessGroup.py | 13 + .../integration/minifi/core/SSLContextService.py | 16 + .../minifi/core/SingleNodeDockerCluster.py | 319 ++++++ docker/test/integration/minifi/core/__init__.py | 0 .../Minifi_flow_yaml_serializer.py | 111 ++ .../flow_serialization/Nifi_flow_xml_serializer.py | 324 ++++++ .../minifi/flow_serialization/__init__.py | 0 .../minifi/processors/DeleteS3Object.py | 22 + .../minifi/processors/GenerateFlowFile.py | 8 + .../test/integration/minifi/processors/GetFile.py | 8 + .../integration/minifi/processors/InvokeHTTP.py | 30 + .../integration/minifi/processors/ListenHTTP.py | 14 + .../integration/minifi/processors/LogAttribute.py | 7 + .../integration/minifi/processors/PublishKafka.py | 10 + .../minifi/processors/PublishKafkaSSL.py | 16 + .../test/integration/minifi/processors/PutFile.py | 14 + .../integration/minifi/processors/PutS3Object.py | 20 + .../test/integration/minifi/processors/__init__.py | 0 .../minifi/validators/EmptyFilesOutPutValidator.py | 27 + .../minifi/validators/FileOutputValidator.py | 8 + .../minifi/validators/NoFileOutPutValidator.py | 25 + .../minifi/validators/OutputValidator.py | 12 + .../minifi/validators/SegfaultValidator.py | 8 + .../minifi/validators/SingleFileOutputValidator.py | 44 + .../test/integration/minifi/validators/__init__.py | 0 docker/test/integration/test_filesystem_ops.py | 2 - docker/test/integration/test_filter_zero_file.py | 1 - docker/test/integration/test_hash_content.py | 2 - docker/test/integration/test_http.py | 2 - docker/test/integration/test_rdkafka.py | 2 - docker/test/integration/test_s2s.py | 2 - docker/test/integration/test_s3.py | 2 - docker/test/integration/test_zero_file.py | 1 - docker/test/test_https.py | 2 - 43 files changed, 1298 insertions(+), 1250 deletions(-) diff --git a/.gitignore b/.gitignore index 46f1e22..cc40bfd 100644 --- a/.gitignore +++ b/.gitignore @@ -57,6 +57,7 @@ libminifi/src/agent/agent_version.cpp docs/generated thirdparty/apache-rat/apache-rat* /compile_commands.json +__pycache__/ # Ignore source files that have been placed in the docker directory during build docker/minificppsource diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py index b64d541..fead6a1 100644 --- a/docker/test/integration/minifi/__init__.py +++ b/docker/test/integration/minifi/__init__.py @@ -30,1076 +30,39 @@ from copy import copy import time from collections import OrderedDict -class Cluster(object): - """ - Base Cluster class. This is intended to be a generic interface - to different types of clusters. Clusters could be Kubernetes clusters, - Docker swarms, or cloud compute/container services. - """ +from .core.Connectable import Connectable +from .core.Cluster import Cluster +from .core.Connectable import Connectable +from .core.ControllerService import ControllerService +from .core.InputPort import InputPort +from .core.Processor import Processor +from .core.RemoteProcessGroup import RemoteProcessGroup +from .core.SingleNodeDockerCluster import SingleNodeDockerCluster +from .core.SSLContextService import SSLContextService +from .core.DockerTestCluster import DockerTestCluster +from .core.OutputEventHandler import OutputEventHandler + +from .flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer +from .flow_serialization.Nifi_flow_xml_serializer import Nifi_flow_xml_serializer + +from .processors.GenerateFlowFile import GenerateFlowFile +from .processors.GetFile import GetFile +from .processors.InvokeHTTP import InvokeHTTP +from .processors.ListenHTTP import ListenHTTP +from .processors.LogAttribute import LogAttribute +from .processors.PublishKafka import PublishKafka +from .processors.PublishKafkaSSL import PublishKafkaSSL +from .processors.PutFile import PutFile +from .processors.PutS3Object import PutS3Object +from .processors.DeleteS3Object import DeleteS3Object + +from .validators.OutputValidator import OutputValidator +from .validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator +from .validators.SegfaultValidator import SegfaultValidator +from .validators.NoFileOutPutValidator import NoFileOutPutValidator +from .validators.SingleFileOutputValidator import SingleFileOutputValidator +from .validators.FileOutputValidator import FileOutputValidator + +logging.basicConfig(level=logging.DEBUG) - def deploy_flow(self, flow, name=None, vols=None): - """ - Deploys a flow to the cluster. - """ - def __enter__(self): - """ - Allocate ephemeral cluster resources. - """ - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Clean up ephemeral cluster resources. - """ - - -class SingleNodeDockerCluster(Cluster): - """ - A "cluster" which consists of a single docker node. Useful for - testing or use-cases which do not span multiple compute nodes. - """ - - def __init__(self): - self.minifi_version = os.environ['MINIFI_VERSION'] - self.nifi_version = '1.7.0' - self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version - self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version - self.network = None - self.containers = OrderedDict() - self.images = [] - self.tmp_files = [] - - # Get docker client - self.client = docker.from_env() - - def deploy_flow(self, - flow, - name=None, - vols=None, - engine='minifi-cpp'): - """ - Compiles the flow to a valid config file and overlays it into a new image. - """ - - if vols is None: - vols = {} - - logging.info('Deploying %s flow...%s', engine,name) - - if name is None: - name = engine + '-' + str(uuid.uuid4()) - logging.info('Flow name was not provided; using generated name \'%s\'', name) - - # Create network if necessary - if self.network is None: - net_name = 'nifi-' + str(uuid.uuid4()) - logging.info('Creating network: %s', net_name) - self.network = self.client.networks.create(net_name) - - if engine == 'nifi': - self.deploy_nifi_flow(flow, name, vols) - elif engine == 'minifi-cpp': - self.deploy_minifi_cpp_flow(flow, name, vols) - elif engine == 'kafka-broker': - self.deploy_kafka_broker(name) - elif engine == 'http-proxy': - self.deploy_http_proxy() - elif engine == 's3-server': - self.deploy_s3_server() - else: - raise Exception('invalid flow engine: \'%s\'' % engine) - - def deploy_minifi_cpp_flow(self, flow, name, vols): - - # Build configured image - dockerfile = dedent("""FROM {base_image} - USER root - ADD config.yml {minifi_root}/conf/config.yml - RUN chown minificpp:minificpp {minifi_root}/conf/config.yml - USER minificpp - """.format(name=name,hostname=name, - base_image='apacheminificpp:' + self.minifi_version, - minifi_root=self.minifi_root)) - - test_flow_yaml = minifi_flow_yaml(flow) - logging.info('Using generated flow config yml:\n%s', test_flow_yaml) - - conf_file_buffer = BytesIO() - - try: - conf_file_buffer.write(test_flow_yaml.encode('utf-8')) - conf_file_len = conf_file_buffer.tell() - conf_file_buffer.seek(0) - - context_files = [ - { - 'name': 'config.yml', - 'size': conf_file_len, - 'file_obj': conf_file_buffer - } - ] - - configured_image = self.build_image(dockerfile, context_files) - - finally: - conf_file_buffer.close() - - logging.info('Creating and running docker container for flow...') - - container = self.client.containers.run( - configured_image[0], - detach=True, - name=name, - network=self.network.name, - volumes=vols) - - logging.info('Started container \'%s\'', container.name) - - self.containers[container.name] = container - - def deploy_nifi_flow(self, flow, name, vols): - dockerfile = dedent(r"""FROM {base_image} - USER root - ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz - RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz - RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties - RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' {nifi_root}/conf/nifi.properties - USER nifi - """.format(name=name, - base_image='apache/nifi:' + self.nifi_version, - nifi_root=self.nifi_root)) - - test_flow_xml = nifi_flow_xml(flow, self.nifi_version) - logging.info('Using generated flow config xml:\n%s', test_flow_xml) - - conf_file_buffer = BytesIO() - - try: - with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer: - conf_gz_file_buffer.write(test_flow_xml.encode()) - conf_file_len = conf_file_buffer.tell() - conf_file_buffer.seek(0) - - context_files = [ - { - 'name': 'flow.xml.gz', - 'size': conf_file_len, - 'file_obj': conf_file_buffer - } - ] - - configured_image = self.build_image(dockerfile, context_files) - - finally: - conf_file_buffer.close() - - logging.info('Creating and running docker container for flow...') - - container = self.client.containers.run( - configured_image[0], - detach=True, - name=name, - hostname=name, - network=self.network.name, - volumes=vols) - - logging.info('Started container \'%s\'', container.name) - - self.containers[container.name] = container - - def deploy_kafka_broker(self, name): - logging.info('Creating and running docker containers for kafka broker...') - zookeeper = self.client.containers.run( - self.client.images.pull("wurstmeister/zookeeper:latest"), - detach=True, - name='zookeeper', - network=self.network.name, - ports={'2181/tcp': 2181}, - ) - self.containers[zookeeper.name] = zookeeper - - test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh - broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka') - broker = self.client.containers.run( - broker_image[0], - detach=True, - name='kafka-broker', - network=self.network.name, - ports={'9092/tcp': 9092}, - environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"], - ) - self.containers[broker.name] = broker - - dockerfile = dedent("""FROM {base_image} - USER root - CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt - """.format(base_image='wurstmeister/kafka:2.12-2.5.0')) - configured_image = self.build_image(dockerfile, []) - consumer = self.client.containers.run( - configured_image[0], - detach=True, - name='kafka-consumer', - network=self.network.name, - ) - self.containers[consumer.name] = consumer - - def deploy_http_proxy(self): - logging.info('Creating and running http-proxy docker container...') - dockerfile = dedent("""FROM {base_image} - RUN apt update && apt install -y apache2-utils - RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password} - RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \ - echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \ - echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \ - echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \ - echo 'http_port {proxy_port}' >> /etc/squid/squid.conf - ENTRYPOINT ["/sbin/entrypoint.sh"] - """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128')) - configured_image = self.build_image(dockerfile, []) - consumer = self.client.containers.run( - configured_image[0], - detach=True, - name='http-proxy', - network=self.network.name, - ports={'3128/tcp': 3128}, - ) - self.containers[consumer.name] = consumer - - def deploy_s3_server(self): - consumer = self.client.containers.run( - "adobe/s3mock:2.1.28", - detach=True, - name='s3-server', - network=self.network.name, - ports={'9090/tcp': 9090, '9191/tcp': 9191}, - environment=["initialBuckets=test_bucket"], - ) - self.containers[consumer.name] = consumer - - def build_image(self, dockerfile, context_files): - conf_dockerfile_buffer = BytesIO() - docker_context_buffer = BytesIO() - - try: - # Overlay conf onto base nifi image - conf_dockerfile_buffer.write(dockerfile.encode()) - conf_dockerfile_buffer.seek(0) - - with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context: - dockerfile_info = tarfile.TarInfo('Dockerfile') - dockerfile_info.size = conf_dockerfile_buffer.getbuffer().nbytes - docker_context.addfile(dockerfile_info, - fileobj=conf_dockerfile_buffer) - - for context_file in context_files: - file_info = tarfile.TarInfo(context_file['name']) - file_info.size = context_file['size'] - docker_context.addfile(file_info, - fileobj=context_file['file_obj']) - docker_context_buffer.seek(0) - - logging.info('Creating configured image...') - configured_image = self.client.images.build(fileobj=docker_context_buffer, - custom_context=True, - rm=True, - forcerm=True) - logging.info('Created image with id: %s', configured_image[0].id) - self.images.append(configured_image) - - finally: - conf_dockerfile_buffer.close() - docker_context_buffer.close() - - return configured_image - - def build_image_by_path(self, dir, name=None): - try: - logging.info('Creating configured image...') - configured_image = self.client.images.build(path=dir, - tag=name, - rm=True, - forcerm=True) - logging.info('Created image with id: %s', configured_image[0].id) - self.images.append(configured_image) - return configured_image - except Exception as e: - logging.info(e) - raise - - def __enter__(self): - """ - Allocate ephemeral cluster resources. - """ - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Clean up ephemeral cluster resources - """ - - # Clean up containers - for container in self.containers.values(): - logging.info('Cleaning up container: %s', container.name) - container.remove(v=True, force=True) - - # Clean up images - for image in reversed(self.images): - logging.info('Cleaning up image: %s', image[0].id) - self.client.images.remove(image[0].id, force=True) - - # Clean up network - if self.network is not None: - logging.info('Cleaning up network network: %s', self.network.name) - self.network.remove() - - # Clean up tmp files - for tmp_file in self.tmp_files: - os.remove(tmp_file) - - -class Connectable(object): - def __init__(self, - name=None, - auto_terminate=None): - - self.uuid = uuid.uuid4() - - if name is None: - self.name = str(self.uuid) - else: - self.name = name - - if auto_terminate is None: - self.auto_terminate = [] - else: - self.auto_terminate = auto_terminate - - self.connections = {} - self.out_proc = self - - self.drop_empty_flowfiles = False - - def connect(self, connections): - for rel in connections: - - # Ensure that rel is not auto-terminated - if rel in self.auto_terminate: - del self.auto_terminate[self.auto_terminate.index(rel)] - - # Add to set of output connections for this rel - if rel not in self.connections: - self.connections[rel] = [] - self.connections[rel].append(connections[rel]) - - return self - - def __rshift__(self, other): - """ - Right shift operator to support flow DSL, for example: - - GetFile('/input') >> LogAttribute() >> PutFile('/output') - - """ - - connected = copy(self) - connected.connections = copy(self.connections) - - if self.out_proc is self: - connected.out_proc = connected - else: - connected.out_proc = copy(connected.out_proc) - - if isinstance(other, tuple): - if isinstance(other[0], tuple): - for rel_tuple in other: - rel = {rel_tuple[0]: rel_tuple[1]} - connected.out_proc.connect(rel) - else: - rel = {other[0]: other[1]} - connected.out_proc.connect(rel) - else: - connected.out_proc.connect({'success': other}) - connected.out_proc = other - - return connected - - def __invert__(self): - """ - Invert operation to set empty file filtering on incoming connections - GetFile('/input') >> ~LogAttribute() - """ - self.drop_empty_flowfiles = True - - return self - - -class Processor(Connectable): - def __init__(self, - clazz, - properties=None, - schedule=None, - name=None, - controller_services=None, - auto_terminate=None): - - super(Processor, self).__init__(name=name, - auto_terminate=auto_terminate) - - if controller_services is None: - controller_services = [] - - if schedule is None: - schedule = {} - - if properties is None: - properties = {} - - if name is None: - pass - - self.clazz = clazz - self.properties = properties - self.controller_services = controller_services - - self.schedule = { - 'scheduling strategy': 'TIMER_DRIVEN', - 'scheduling period': '1 sec', - 'penalization period': '30 sec', - 'yield period': '1 sec', - 'run duration nanos': 0 - } - self.schedule.update(schedule) - - def nifi_property_key(self, key): - """ - Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as - the internal key. - """ - return key - - -class InvokeHTTP(Processor): - def __init__(self, url, - method='GET', - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password='', - ssl_context_service=None, - schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - properties = {'Remote URL': url, - 'HTTP Method': method, - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'invokehttp-proxy-username': proxy_username, - 'invokehttp-proxy-password': proxy_password} - - controller_services = [] - - if ssl_context_service is not None: - properties['SSL Context Service'] = ssl_context_service.name - controller_services.append(ssl_context_service) - - super(InvokeHTTP, self).__init__('InvokeHTTP', - properties=properties, - controller_services=controller_services, - auto_terminate=['success', - 'response', - 'retry', - 'failure', - 'no retry'], - schedule=schedule) - - -class ListenHTTP(Processor): - def __init__(self, port, cert=None, schedule=None): - properties = {'Listening Port': port} - - if cert is not None: - properties['SSL Certificate'] = cert - properties['SSL Verify Peer'] = 'no' - - super(ListenHTTP, self).__init__('ListenHTTP', - properties=properties, - auto_terminate=['success'], - schedule=schedule) - - -class LogAttribute(Processor): - def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(LogAttribute, self).__init__('LogAttribute', - auto_terminate=['success'], - schedule=schedule) - - -class GetFile(Processor): - def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}): - super(GetFile, self).__init__('GetFile', - properties={'Input Directory': input_dir, 'Keep Source File': 'true'}, - schedule=schedule, - auto_terminate=['success']) - - -class GenerateFlowFile(Processor): - def __init__(self, file_size, schedule={'scheduling period': '0 sec'}): - super(GenerateFlowFile, self).__init__('GenerateFlowFile', - properties={'File Size': file_size}, - schedule=schedule, - auto_terminate=['success']) - - -class PutFile(Processor): - def __init__(self, output_dir, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PutFile, self).__init__('PutFile', - properties={'Directory': output_dir, 'Directory Permissions': '777', 'Permissions': '777'}, - auto_terminate=['success', 'failure'], - schedule=schedule) - - def nifi_property_key(self, key): - if key == 'Directory Permissions': - return None - else: - return key - - -class PublishKafka(Processor): - def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PublishKafka, self).__init__('PublishKafka', - properties={'Client Name': 'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test', - 'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1', - 'Request Timeout': '10 sec', 'Message Timeout': '12 sec'}, - auto_terminate=['success'], - schedule=schedule) - - -class PublishKafkaSSL(Processor): - def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PublishKafkaSSL, self).__init__('PublishKafka', - properties={'Client Name': 'LMN', 'Known Brokers': 'kafka-broker:9093', - 'Topic Name': 'test', 'Batch Size': '10', - 'Compress Codec': 'none', 'Delivery Guarantee': '1', - 'Request Timeout': '10 sec', 'Message Timeout': '12 sec', - 'Security CA': '/tmp/resources/certs/ca-cert', - 'Security Cert': '/tmp/resources/certs/client_LMN_client.pem', - 'Security Pass Phrase': 'abcdefgh', - 'Security Private Key': '/tmp/resources/certs/client_LMN_client.key', - 'Security Protocol': 'ssl'}, - auto_terminate=['success'], - schedule=schedule) - -class PutS3Object(Processor): - def __init__(self, - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(PutS3Object, self).__init__('PutS3Object', - properties={ - 'Object Key': 'test_object_key', - 'Bucket': 'test_bucket', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': "http://s3-server:9090", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password, - }, - auto_terminate=['success']) - -class DeleteS3Object(Processor): - def __init__(self, - proxy_host='', - proxy_port='', - proxy_username='', - proxy_password=''): - super(DeleteS3Object, self).__init__('DeleteS3Object', - properties={ - 'Object Key': 'test_object_key', - 'Bucket': 'test_bucket', - 'Access Key': 'test_access_key', - 'Secret Key': 'test_secret', - 'Endpoint Override URL': "http://s3-server:9090", - 'Proxy Host': proxy_host, - 'Proxy Port': proxy_port, - 'Proxy Username': proxy_username, - 'Proxy Password': proxy_password, - }, - auto_terminate=['success']) - -class InputPort(Connectable): - def __init__(self, name=None, remote_process_group=None): - super(InputPort, self).__init__(name=name) - - self.remote_process_group = remote_process_group - - -class RemoteProcessGroup(object): - def __init__(self, url, - name=None): - - self.uuid = uuid.uuid4() - - if name is None: - self.name = str(self.uuid) - else: - self.name = name - - self.url = url - - -class ControllerService(object): - def __init__(self, name=None, properties=None): - - self.id = str(uuid.uuid4()) - - if name is None: - self.name = str(uuid.uuid4()) - logging.info('Controller service name was not provided; using generated name \'%s\'', self.name) - else: - self.name = name - - if properties is None: - properties = {} - - self.properties = properties - - -class SSLContextService(ControllerService): - def __init__(self, name=None, cert=None, key=None, ca_cert=None): - super(SSLContextService, self).__init__(name=name) - - self.service_class = 'SSLContextService' - - if cert is not None: - self.properties['Client Certificate'] = cert - - if key is not None: - self.properties['Private Key'] = key - - if ca_cert is not None: - self.properties['CA Certificate'] = ca_cert - - -def minifi_flow_yaml(connectable, root=None, visited=None): - if visited is None: - visited = [] - - if root is None: - res = { - 'Flow Controller': { - 'name': 'MiNiFi Flow' - }, - 'Processors': [], - 'Connections': [], - 'Remote Processing Groups': [], - 'Controller Services': [] - } - else: - res = root - - visited.append(connectable) - - if hasattr(connectable, 'name'): - connectable_name = connectable.name - else: - connectable_name = str(connectable.uuid) - - if isinstance(connectable, InputPort): - group = connectable.remote_process_group - res_group = None - - for res_group_candidate in res['Remote Processing Groups']: - assert isinstance(res_group_candidate, dict) - if res_group_candidate['id'] == str(group.uuid): - res_group = res_group_candidate - - if res_group is None: - res_group = { - 'name': group.name, - 'id': str(group.uuid), - 'url': group.url, - 'timeout': '30 sec', - 'yield period': '3 sec', - 'Input Ports': [] - } - - res['Remote Processing Groups'].append(res_group) - - res_group['Input Ports'].append({ - 'id': str(connectable.uuid), - 'name': connectable.name, - 'max concurrent tasks': 1, - 'Properties': {} - }) - - if isinstance(connectable, Processor): - res['Processors'].append({ - 'name': connectable_name, - 'id': str(connectable.uuid), - 'class': 'org.apache.nifi.processors.standard.' + connectable.clazz, - 'scheduling strategy': connectable.schedule['scheduling strategy'], - 'scheduling period': connectable.schedule['scheduling period'], - 'penalization period': connectable.schedule['penalization period'], - 'yield period': connectable.schedule['yield period'], - 'run duration nanos': connectable.schedule['run duration nanos'], - 'Properties': connectable.properties, - 'auto-terminated relationships list': connectable.auto_terminate - }) - - for svc in connectable.controller_services: - if svc in visited: - continue - - visited.append(svc) - res['Controller Services'].append({ - 'name': svc.name, - 'id': svc.id, - 'class': svc.service_class, - 'Properties': svc.properties - }) - - for conn_name in connectable.connections: - conn_procs = connectable.connections[conn_name] - - if isinstance(conn_procs, list): - for proc in conn_procs: - res['Connections'].append({ - 'name': str(uuid.uuid4()), - 'source id': str(connectable.uuid), - 'source relationship name': conn_name, - 'destination id': str(proc.uuid), - 'drop empty': ("true" if proc.drop_empty_flowfiles else "false") - }) - if proc not in visited: - minifi_flow_yaml(proc, res, visited) - else: - res['Connections'].append({ - 'name': str(uuid.uuid4()), - 'source id': str(connectable.uuid), - 'source relationship name': conn_name, - 'destination id': str(conn_procs.uuid) - }) - if conn_procs not in visited: - minifi_flow_yaml(conn_procs, res, visited) - - if root is None: - return yaml.dump(res, default_flow_style=False) - - -def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): - if visited is None: - visited = [] - - position = Element('position') - position.set('x', '0.0') - position.set('y', '0.0') - - comment = Element('comment') - styles = Element('styles') - bend_points = Element('bendPoints') - label_index = Element('labelIndex') - label_index.text = '1' - z_index = Element('zIndex') - z_index.text = '0' - - if root is None: - res = Element('flowController') - max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount') - max_timer_driven_thread_count.text = '10' - res.append(max_timer_driven_thread_count) - max_event_driven_thread_count = Element('maxEventDrivenThreadCount') - max_event_driven_thread_count.text = '5' - res.append(max_event_driven_thread_count) - root_group = Element('rootGroup') - root_group_id = Element('id') - root_group_id_text = str(uuid.uuid4()) - root_group_id.text = root_group_id_text - root_group.append(root_group_id) - root_group_name = Element('name') - root_group_name.text = root_group_id_text - root_group.append(root_group_name) - res.append(root_group) - root_group.append(position) - root_group.append(comment) - res.append(Element('controllerServices')) - res.append(Element('reportingTasks')) - res.set('encoding-version', '1.2') - else: - res = root - - visited.append(connectable) - - if hasattr(connectable, 'name'): - connectable_name_text = connectable.name - else: - connectable_name_text = str(connectable.uuid) - - if isinstance(connectable, InputPort): - input_port = Element('inputPort') - - input_port_id = Element('id') - input_port_id.text = str(connectable.uuid) - input_port.append(input_port_id) - - input_port_name = Element('name') - input_port_name.text = connectable_name_text - input_port.append(input_port_name) - - input_port.append(position) - input_port.append(comment) - - input_port_scheduled_state = Element('scheduledState') - input_port_scheduled_state.text = 'RUNNING' - input_port.append(input_port_scheduled_state) - - input_port_max_concurrent_tasks = Element('maxConcurrentTasks') - input_port_max_concurrent_tasks.text = '1' - input_port.append(input_port_max_concurrent_tasks) - next( res.iterfind('rootGroup') ).append(input_port) - - if isinstance(connectable, Processor): - conn_destination = Element('processor') - - proc_id = Element('id') - proc_id.text = str(connectable.uuid) - conn_destination.append(proc_id) - - proc_name = Element('name') - proc_name.text = connectable_name_text - conn_destination.append(proc_name) - - conn_destination.append(position) - conn_destination.append(styles) - conn_destination.append(comment) - - proc_class = Element('class') - proc_class.text = 'org.apache.nifi.processors.standard.' + connectable.clazz - conn_destination.append(proc_class) - - proc_bundle = Element('bundle') - proc_bundle_group = Element('group') - proc_bundle_group.text = 'org.apache.nifi' - proc_bundle.append(proc_bundle_group) - proc_bundle_artifact = Element('artifact') - proc_bundle_artifact.text = 'nifi-standard-nar' - proc_bundle.append(proc_bundle_artifact) - proc_bundle_version = Element('version') - proc_bundle_version.text = nifi_version - proc_bundle.append(proc_bundle_version) - conn_destination.append(proc_bundle) - - proc_max_concurrent_tasks = Element('maxConcurrentTasks') - proc_max_concurrent_tasks.text = '1' - conn_destination.append(proc_max_concurrent_tasks) - - proc_scheduling_period = Element('schedulingPeriod') - proc_scheduling_period.text = connectable.schedule['scheduling period'] - conn_destination.append(proc_scheduling_period) - - proc_penalization_period = Element('penalizationPeriod') - proc_penalization_period.text = connectable.schedule['penalization period'] - conn_destination.append(proc_penalization_period) - - proc_yield_period = Element('yieldPeriod') - proc_yield_period.text = connectable.schedule['yield period'] - conn_destination.append(proc_yield_period) - - proc_bulletin_level = Element('bulletinLevel') - proc_bulletin_level.text = 'WARN' - conn_destination.append(proc_bulletin_level) - - proc_loss_tolerant = Element('lossTolerant') - proc_loss_tolerant.text = 'false' - conn_destination.append(proc_loss_tolerant) - - proc_scheduled_state = Element('scheduledState') - proc_scheduled_state.text = 'RUNNING' - conn_destination.append(proc_scheduled_state) - - proc_scheduling_strategy = Element('schedulingStrategy') - proc_scheduling_strategy.text = connectable.schedule['scheduling strategy'] - conn_destination.append(proc_scheduling_strategy) - - proc_execution_node = Element('executionNode') - proc_execution_node.text = 'ALL' - conn_destination.append(proc_execution_node) - - proc_run_duration_nanos = Element('runDurationNanos') - proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos']) - conn_destination.append(proc_run_duration_nanos) - - for property_key, property_value in connectable.properties.items(): - proc_property = Element('property') - proc_property_name = Element('name') - proc_property_name.text = connectable.nifi_property_key(property_key) - if not proc_property_name.text: - continue - proc_property.append(proc_property_name) - proc_property_value = Element('value') - proc_property_value.text = property_value - proc_property.append(proc_property_value) - conn_destination.append(proc_property) - - for auto_terminate_rel in connectable.auto_terminate: - proc_auto_terminated_relationship = Element('autoTerminatedRelationship') - proc_auto_terminated_relationship.text = auto_terminate_rel - conn_destination.append(proc_auto_terminated_relationship) - next( res.iterfind('rootGroup') ).append(conn_destination) - """ res.iterfind('rootGroup').next().append(conn_destination) """ - - for svc in connectable.controller_services: - if svc in visited: - continue - - visited.append(svc) - controller_service = Element('controllerService') - - controller_service_id = Element('id') - controller_service_id.text = str(svc.id) - controller_service.append(controller_service_id) - - controller_service_name = Element('name') - controller_service_name.text = svc.name - controller_service.append(controller_service_name) - - controller_service.append(comment) - - controller_service_class = Element('class') - controller_service_class.text = svc.service_class, - controller_service.append(controller_service_class) - - controller_service_bundle = Element('bundle') - controller_service_bundle_group = Element('group') - controller_service_bundle_group.text = svc.group - controller_service_bundle.append(controller_service_bundle_group) - controller_service_bundle_artifact = Element('artifact') - controller_service_bundle_artifact.text = svc.artifact - controller_service_bundle.append(controller_service_bundle_artifact) - controller_service_bundle_version = Element('version') - controller_service_bundle_version.text = nifi_version - controller_service_bundle.append(controller_service_bundle_version) - controller_service.append(controller_service_bundle) - - controller_enabled = Element('enabled') - controller_enabled.text = 'true', - controller_service.append(controller_enabled) - - for property_key, property_value in svc.properties: - controller_service_property = Element('property') - controller_service_property_name = Element('name') - controller_service_property_name.text = property_key - controller_service_property.append(controller_service_property_name) - controller_service_property_value = Element('value') - controller_service_property_value.text = property_value - controller_service_property.append(controller_service_property_value) - controller_service.append(controller_service_property) - next( res.iterfind('rootGroup') ).append(controller_service) - """ res.iterfind('rootGroup').next().append(controller_service)""" - - for conn_name in connectable.connections: - conn_destinations = connectable.connections[conn_name] - - if isinstance(conn_destinations, list): - for conn_destination in conn_destinations: - connection = nifi_flow_xml_connection(res, - bend_points, - conn_name, - connectable, - label_index, - conn_destination, - z_index) - next( res.iterfind('rootGroup') ).append(connection) - """ res.iterfind('rootGroup').next().append(connection) """ - - if conn_destination not in visited: - nifi_flow_xml(conn_destination, nifi_version, res, visited) - else: - connection = nifi_flow_xml_connection(res, - bend_points, - conn_name, - connectable, - label_index, - conn_destinations, - z_index) - next( res.iterfind('rootGroup') ).append(connection) - """ res.iterfind('rootGroup').next().append(connection) """ - - if conn_destinations not in visited: - nifi_flow_xml(conn_destinations, nifi_version, res, visited) - - if root is None: - return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>' - + "\n" - + elementTree.tostring(res, encoding='utf-8').decode('utf-8')) - - -def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_index, destination, z_index): - connection = Element('connection') - - connection_id = Element('id') - connection_id.text = str(uuid.uuid4()) - connection.append(connection_id) - - connection_name = Element('name') - connection.append(connection_name) - - connection.append(bend_points) - connection.append(label_index) - connection.append(z_index) - - connection_source_id = Element('sourceId') - connection_source_id.text = str(connectable.uuid) - connection.append(connection_source_id) - - connection_source_group_id = Element('sourceGroupId') - connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text - """connection_source_group_id.text = res.iterfind('rootGroup/id').next().text""" - connection.append(connection_source_group_id) - - connection_source_type = Element('sourceType') - if isinstance(connectable, Processor): - connection_source_type.text = 'PROCESSOR' - elif isinstance(connectable, InputPort): - connection_source_type.text = 'INPUT_PORT' - else: - raise Exception('Unexpected source type: %s' % type(connectable)) - connection.append(connection_source_type) - - connection_destination_id = Element('destinationId') - connection_destination_id.text = str(destination.uuid) - connection.append(connection_destination_id) - - connection_destination_group_id = Element('destinationGroupId') - connection_destination_group_id.text = next(res.iterfind('rootGroup/id')).text - """ connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text """ - connection.append(connection_destination_group_id) - - connection_destination_type = Element('destinationType') - if isinstance(destination, Processor): - connection_destination_type.text = 'PROCESSOR' - elif isinstance(destination, InputPort): - connection_destination_type.text = 'INPUT_PORT' - else: - raise Exception('Unexpected destination type: %s' % type(destination)) - connection.append(connection_destination_type) - - connection_relationship = Element('relationship') - if not isinstance(connectable, InputPort): - connection_relationship.text = conn_name - connection.append(connection_relationship) - - connection_max_work_queue_size = Element('maxWorkQueueSize') - connection_max_work_queue_size.text = '10000' - connection.append(connection_max_work_queue_size) - - connection_max_work_queue_data_size = Element('maxWorkQueueDataSize') - connection_max_work_queue_data_size.text = '1 GB' - connection.append(connection_max_work_queue_data_size) - - connection_flow_file_expiration = Element('flowFileExpiration') - connection_flow_file_expiration.text = '0 sec' - connection.append(connection_flow_file_expiration) - - return connection diff --git a/docker/test/integration/minifi/core/Cluster.py b/docker/test/integration/minifi/core/Cluster.py new file mode 100644 index 0000000..0818f37 --- /dev/null +++ b/docker/test/integration/minifi/core/Cluster.py @@ -0,0 +1,22 @@ +class Cluster(object): + """ + Base Cluster class. This is intended to be a generic interface + to different types of clusters. Clusters could be Kubernetes clusters, + Docker swarms, or cloud compute/container services. + """ + + def deploy_flow(self, flow, name=None, vols=None): + """ + Deploys a flow to the cluster. + """ + + def __enter__(self): + """ + Allocate ephemeral cluster resources. + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Clean up ephemeral cluster resources. + """ diff --git a/docker/test/integration/minifi/core/Connectable.py b/docker/test/integration/minifi/core/Connectable.py new file mode 100644 index 0000000..c3472a7 --- /dev/null +++ b/docker/test/integration/minifi/core/Connectable.py @@ -0,0 +1,77 @@ +import uuid +from copy import copy + +class Connectable(object): + def __init__(self, + name=None, + auto_terminate=None): + + self.uuid = uuid.uuid4() + + if name is None: + self.name = str(self.uuid) + else: + self.name = name + + if auto_terminate is None: + self.auto_terminate = [] + else: + self.auto_terminate = auto_terminate + + self.connections = {} + self.out_proc = self + + self.drop_empty_flowfiles = False + + def connect(self, connections): + for rel in connections: + + # Ensure that rel is not auto-terminated + if rel in self.auto_terminate: + del self.auto_terminate[self.auto_terminate.index(rel)] + + # Add to set of output connections for this rel + if rel not in self.connections: + self.connections[rel] = [] + self.connections[rel].append(connections[rel]) + + return self + + def __rshift__(self, other): + """ + Right shift operator to support flow DSL, for example: + + GetFile('/input') >> LogAttribute() >> PutFile('/output') + + """ + + connected = copy(self) + connected.connections = copy(self.connections) + + if self.out_proc is self: + connected.out_proc = connected + else: + connected.out_proc = copy(connected.out_proc) + + if isinstance(other, tuple): + if isinstance(other[0], tuple): + for rel_tuple in other: + rel = {rel_tuple[0]: rel_tuple[1]} + connected.out_proc.connect(rel) + else: + rel = {other[0]: other[1]} + connected.out_proc.connect(rel) + else: + connected.out_proc.connect({'success': other}) + connected.out_proc = other + + return connected + + def __invert__(self): + """ + Invert operation to set empty file filtering on incoming connections + GetFile('/input') >> ~LogAttribute() + """ + self.drop_empty_flowfiles = True + + return self diff --git a/docker/test/integration/minifi/core/ControllerService.py b/docker/test/integration/minifi/core/ControllerService.py new file mode 100644 index 0000000..d8b4e17 --- /dev/null +++ b/docker/test/integration/minifi/core/ControllerService.py @@ -0,0 +1,17 @@ +import uuid + +class ControllerService(object): + def __init__(self, name=None, properties=None): + + self.id = str(uuid.uuid4()) + + if name is None: + self.name = str(uuid.uuid4()) + logging.info('Controller service name was not provided; using generated name \'%s\'', self.name) + else: + self.name = name + + if properties is None: + properties = {} + + self.properties = properties diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/core/DockerTestCluster.py similarity index 66% rename from docker/test/integration/minifi/test/__init__.py rename to docker/test/integration/minifi/core/DockerTestCluster.py index bf64472..1742f83 100644 --- a/docker/test/integration/minifi/test/__init__.py +++ b/docker/test/integration/minifi/core/DockerTestCluster.py @@ -1,45 +1,20 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the \"License\"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an \"AS IS\" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - +import json import logging +import os import shutil -import uuid -import tarfile import subprocess import sys import time -import subprocess -import json -from io import BytesIO -from threading import Event +import uuid -import os -from os import listdir from os.path import join +from threading import Event from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer -from minifi import SingleNodeDockerCluster - -logging.basicConfig(level=logging.DEBUG) - -def put_file_contents(contents, file_abs_path): - logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path) - with open(file_abs_path, 'wb') as test_input_file: - test_input_file.write(contents) - +from .OutputEventHandler import OutputEventHandler +from .SingleNodeDockerCluster import SingleNodeDockerCluster +from ..validators.FileOutputValidator import FileOutputValidator class DockerTestCluster(SingleNodeDockerCluster): def __init__(self, output_validator): @@ -134,7 +109,7 @@ class DockerTestCluster(SingleNodeDockerCluster): self.test_data = contents file_name = str(uuid.uuid4()) file_abs_path = join(self.tmp_test_input_dir, file_name) - put_file_contents(contents.encode('utf-8'), file_abs_path) + self.put_file_contents(contents.encode('utf-8'), file_abs_path) def put_test_resource(self, file_name, contents): """ @@ -143,7 +118,7 @@ class DockerTestCluster(SingleNodeDockerCluster): """ file_abs_path = join(self.tmp_test_resources_dir, file_name) - put_file_contents(contents, file_abs_path) + self.put_file_contents(contents, file_abs_path) def restart_observer_if_needed(self): if self.observer.is_alive(): @@ -238,6 +213,12 @@ class DockerTestCluster(SingleNodeDockerCluster): check_count += 1 time.sleep(1) return False + + def put_file_contents(self, contents, file_abs_path): + logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path) + with open(file_abs_path, 'wb') as test_input_file: + test_input_file.write(contents) + def __exit__(self, exc_type, exc_val, exc_tb): """ @@ -252,132 +233,3 @@ class DockerTestCluster(SingleNodeDockerCluster): shutil.rmtree(self.tmp_test_resources_dir) super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb) - - -class OutputEventHandler(FileSystemEventHandler): - def __init__(self, validator, done_event): - self.validator = validator - self.done_event = done_event - - def on_created(self, event): - logging.info('Output file created: ' + event.src_path) - self.check(event) - - def on_modified(self, event): - logging.info('Output file modified: ' + event.src_path) - self.check(event) - - def check(self, event): - if self.validator.validate(): - logging.info('Output file is valid') - self.done_event.set() - else: - logging.info('Output file is invalid') - - -class OutputValidator(object): - """ - Base output validator class. Validators must implement - method validate, which returns a boolean. - """ - - def validate(self): - """ - Return True if output is valid; False otherwise. - """ - raise NotImplementedError("validate function needs to be implemented for validators") - - - -class FileOutputValidator(OutputValidator): - def set_output_dir(self, output_dir): - self.output_dir = output_dir - - def validate(self, dir=''): - pass - -class SingleFileOutputValidator(FileOutputValidator): - """ - Validates the content of a single file in the given directory. - """ - - def __init__(self, expected_content, subdir=''): - self.valid = False - self.expected_content = expected_content - self.subdir = subdir - - def validate(self): - self.valid = False - full_dir = os.path.join(self.output_dir, self.subdir) - logging.info("Output folder: %s", full_dir) - - if not os.path.isdir(full_dir): - return self.valid - - listing = listdir(full_dir) - if listing: - for l in listing: - logging.info("name:: %s", l) - out_file_name = listing[0] - full_path = join(full_dir, out_file_name) - if not os.path.isfile(full_path): - return self.valid - - with open(full_path, 'r') as out_file: - contents = out_file.read() - logging.info("dir %s -- name %s", full_dir, out_file_name) - logging.info("expected %s -- content %s", self.expected_content, contents) - - if self.expected_content in contents: - self.valid = True - - return self.valid - - -class EmptyFilesOutPutValidator(FileOutputValidator): - """ - Validates if all the files in the target directory are empty and at least one exists - """ - def __init__(self): - self.valid = False - - def validate(self, dir=''): - - if self.valid: - return True - - full_dir = self.output_dir + dir - logging.info("Output folder: %s", full_dir) - listing = listdir(full_dir) - if listing: - self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 for x in listing) - - return self.valid - -class NoFileOutPutValidator(FileOutputValidator): - """ - Validates if no flowfiles were transferred - """ - def __init__(self): - self.valid = False - - def validate(self, dir=''): - - if self.valid: - return True - - full_dir = self.output_dir + dir - logging.info("Output folder: %s", full_dir) - listing = listdir(full_dir) - - self.valid = not bool(listing) - - return self.valid - - -class SegfaultValidator(OutputValidator): - """ - Validate that a file was received. - """ - def validate(self): - return True diff --git a/docker/test/integration/minifi/core/InputPort.py b/docker/test/integration/minifi/core/InputPort.py new file mode 100644 index 0000000..1f00b7f --- /dev/null +++ b/docker/test/integration/minifi/core/InputPort.py @@ -0,0 +1,7 @@ +from .Connectable import Connectable + +class InputPort(Connectable): + def __init__(self, name=None, remote_process_group=None): + super(InputPort, self).__init__(name=name) + + self.remote_process_group = remote_process_group diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py new file mode 100644 index 0000000..3d4c984 --- /dev/null +++ b/docker/test/integration/minifi/core/OutputEventHandler.py @@ -0,0 +1,24 @@ +import logging + +from watchdog.events import FileSystemEventHandler + +class OutputEventHandler(FileSystemEventHandler): + def __init__(self, validator, done_event): + self.validator = validator + self.done_event = done_event + + def on_created(self, event): + logging.info('Output file created: ' + event.src_path) + self.check(event) + + def on_modified(self, event): + logging.info('Output file modified: ' + event.src_path) + self.check(event) + + def check(self, event): + if self.validator.validate(): + logging.info('Output file is valid') + self.done_event.set() + else: + logging.info('Output file is invalid') + diff --git a/docker/test/integration/minifi/core/Processor.py b/docker/test/integration/minifi/core/Processor.py new file mode 100644 index 0000000..e7e41e5 --- /dev/null +++ b/docker/test/integration/minifi/core/Processor.py @@ -0,0 +1,45 @@ +from .Connectable import Connectable + +class Processor(Connectable): + def __init__(self, + clazz, + properties=None, + schedule=None, + name=None, + controller_services=None, + auto_terminate=None): + + super(Processor, self).__init__(name=name, + auto_terminate=auto_terminate) + + if controller_services is None: + controller_services = [] + + if schedule is None: + schedule = {} + + if properties is None: + properties = {} + + if name is None: + pass + + self.clazz = clazz + self.properties = properties + self.controller_services = controller_services + + self.schedule = { + 'scheduling strategy': 'TIMER_DRIVEN', + 'scheduling period': '1 sec', + 'penalization period': '30 sec', + 'yield period': '1 sec', + 'run duration nanos': 0 + } + self.schedule.update(schedule) + + def nifi_property_key(self, key): + """ + Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as + the internal key. + """ + return key diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py b/docker/test/integration/minifi/core/RemoteProcessGroup.py new file mode 100644 index 0000000..6901fb6 --- /dev/null +++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py @@ -0,0 +1,13 @@ +import uuid + +class RemoteProcessGroup(object): + def __init__(self, url, + name=None): + self.uuid = uuid.uuid4() + + if name is None: + self.name = str(self.uuid) + else: + self.name = name + + self.url = url diff --git a/docker/test/integration/minifi/core/SSLContextService.py b/docker/test/integration/minifi/core/SSLContextService.py new file mode 100644 index 0000000..5866508 --- /dev/null +++ b/docker/test/integration/minifi/core/SSLContextService.py @@ -0,0 +1,16 @@ +from .ControllerService import ControllerService + +class SSLContextService(ControllerService): + def __init__(self, name=None, cert=None, key=None, ca_cert=None): + super(SSLContextService, self).__init__(name=name) + + self.service_class = 'SSLContextService' + + if cert is not None: + self.properties['Client Certificate'] = cert + + if key is not None: + self.properties['Private Key'] = key + + if ca_cert is not None: + self.properties['CA Certificate'] = ca_cert diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py new file mode 100644 index 0000000..f48c288 --- /dev/null +++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py @@ -0,0 +1,319 @@ +import gzip +import docker +import logging +import os +import tarfile +import uuid + +from collections import OrderedDict +from io import BytesIO +from textwrap import dedent + +from .Cluster import Cluster +from ..flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer +from ..flow_serialization.Nifi_flow_xml_serializer import Nifi_flow_xml_serializer + +class SingleNodeDockerCluster(Cluster): + """ + A "cluster" which consists of a single docker node. Useful for + testing or use-cases which do not span multiple compute nodes. + """ + + def __init__(self): + self.minifi_version = os.environ['MINIFI_VERSION'] + self.nifi_version = '1.7.0' + self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version + self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version + self.network = None + self.containers = OrderedDict() + self.images = [] + self.tmp_files = [] + + # Get docker client + self.client = docker.from_env() + + def deploy_flow(self, + flow, + name=None, + vols=None, + engine='minifi-cpp'): + """ + Compiles the flow to a valid config file and overlays it into a new image. + """ + + if vols is None: + vols = {} + + logging.info('Deploying %s flow...%s', engine,name) + + if name is None: + name = engine + '-' + str(uuid.uuid4()) + logging.info('Flow name was not provided; using generated name \'%s\'', name) + + # Create network if necessary + if self.network is None: + net_name = 'nifi-' + str(uuid.uuid4()) + logging.info('Creating network: %s', net_name) + self.network = self.client.networks.create(net_name) + + if engine == 'nifi': + self.deploy_nifi_flow(flow, name, vols) + elif engine == 'minifi-cpp': + self.deploy_minifi_cpp_flow(flow, name, vols) + elif engine == 'kafka-broker': + self.deploy_kafka_broker(name) + elif engine == 'http-proxy': + self.deploy_http_proxy() + elif engine == 's3-server': + self.deploy_s3_server() + else: + raise Exception('invalid flow engine: \'%s\'' % engine) + + def deploy_minifi_cpp_flow(self, flow, name, vols): + + # Build configured image + dockerfile = dedent("""FROM {base_image} + USER root + ADD config.yml {minifi_root}/conf/config.yml + RUN chown minificpp:minificpp {minifi_root}/conf/config.yml + USER minificpp + """.format(name=name,hostname=name, + base_image='apacheminificpp:' + self.minifi_version, + minifi_root=self.minifi_root)) + + serializer = Minifi_flow_yaml_serializer() + test_flow_yaml = serializer.serialize(flow) + logging.info('Using generated flow config yml:\n%s', test_flow_yaml) + + conf_file_buffer = BytesIO() + + try: + conf_file_buffer.write(test_flow_yaml.encode('utf-8')) + conf_file_len = conf_file_buffer.tell() + conf_file_buffer.seek(0) + + context_files = [ + { + 'name': 'config.yml', + 'size': conf_file_len, + 'file_obj': conf_file_buffer + } + ] + + configured_image = self.build_image(dockerfile, context_files) + + finally: + conf_file_buffer.close() + + logging.info('Creating and running docker container for flow...') + + container = self.client.containers.run( + configured_image[0], + detach=True, + name=name, + network=self.network.name, + volumes=vols) + + logging.info('Started container \'%s\'', container.name) + + self.containers[container.name] = container + + def deploy_nifi_flow(self, flow, name, vols): + dockerfile = dedent(r"""FROM {base_image} + USER root + ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz + RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz + RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties + RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' {nifi_root}/conf/nifi.properties + USER nifi + """.format(name=name, + base_image='apache/nifi:' + self.nifi_version, + nifi_root=self.nifi_root)) + + serializer = Nifi_flow_xml_serializer() + test_flow_xml = serializer.serialize(flow, self.nifi_version) + logging.info('Using generated flow config xml:\n%s', test_flow_xml) + + conf_file_buffer = BytesIO() + + try: + with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer: + conf_gz_file_buffer.write(test_flow_xml.encode()) + conf_file_len = conf_file_buffer.tell() + conf_file_buffer.seek(0) + + context_files = [ + { + 'name': 'flow.xml.gz', + 'size': conf_file_len, + 'file_obj': conf_file_buffer + } + ] + + configured_image = self.build_image(dockerfile, context_files) + + finally: + conf_file_buffer.close() + + logging.info('Creating and running docker container for flow...') + + container = self.client.containers.run( + configured_image[0], + detach=True, + name=name, + hostname=name, + network=self.network.name, + volumes=vols) + + logging.info('Started container \'%s\'', container.name) + + self.containers[container.name] = container + + def deploy_kafka_broker(self, name): + logging.info('Creating and running docker containers for kafka broker...') + zookeeper = self.client.containers.run( + self.client.images.pull("wurstmeister/zookeeper:latest"), + detach=True, + name='zookeeper', + network=self.network.name, + ports={'2181/tcp': 2181}, + ) + self.containers[zookeeper.name] = zookeeper + + test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh + broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka') + broker = self.client.containers.run( + broker_image[0], + detach=True, + name='kafka-broker', + network=self.network.name, + ports={'9092/tcp': 9092}, + environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"], + ) + self.containers[broker.name] = broker + + dockerfile = dedent("""FROM {base_image} + USER root + CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt + """.format(base_image='wurstmeister/kafka:2.12-2.5.0')) + configured_image = self.build_image(dockerfile, []) + consumer = self.client.containers.run( + configured_image[0], + detach=True, + name='kafka-consumer', + network=self.network.name, + ) + self.containers[consumer.name] = consumer + + def deploy_http_proxy(self): + logging.info('Creating and running http-proxy docker container...') + dockerfile = dedent("""FROM {base_image} + RUN apt update && apt install -y apache2-utils + RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password} + RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \ + echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \ + echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \ + echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \ + echo 'http_port {proxy_port}' >> /etc/squid/squid.conf + ENTRYPOINT ["/sbin/entrypoint.sh"] + """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128')) + configured_image = self.build_image(dockerfile, []) + consumer = self.client.containers.run( + configured_image[0], + detach=True, + name='http-proxy', + network=self.network.name, + ports={'3128/tcp': 3128}, + ) + self.containers[consumer.name] = consumer + + def deploy_s3_server(self): + consumer = self.client.containers.run( + "adobe/s3mock:2.1.28", + detach=True, + name='s3-server', + network=self.network.name, + ports={'9090/tcp': 9090, '9191/tcp': 9191}, + environment=["initialBuckets=test_bucket"], + ) + self.containers[consumer.name] = consumer + + def build_image(self, dockerfile, context_files): + conf_dockerfile_buffer = BytesIO() + docker_context_buffer = BytesIO() + + try: + # Overlay conf onto base nifi image + conf_dockerfile_buffer.write(dockerfile.encode()) + conf_dockerfile_buffer.seek(0) + + with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context: + dockerfile_info = tarfile.TarInfo('Dockerfile') + dockerfile_info.size = conf_dockerfile_buffer.getbuffer().nbytes + docker_context.addfile(dockerfile_info, + fileobj=conf_dockerfile_buffer) + + for context_file in context_files: + file_info = tarfile.TarInfo(context_file['name']) + file_info.size = context_file['size'] + docker_context.addfile(file_info, + fileobj=context_file['file_obj']) + docker_context_buffer.seek(0) + + logging.info('Creating configured image...') + configured_image = self.client.images.build(fileobj=docker_context_buffer, + custom_context=True, + rm=True, + forcerm=True) + logging.info('Created image with id: %s', configured_image[0].id) + self.images.append(configured_image) + + finally: + conf_dockerfile_buffer.close() + docker_context_buffer.close() + + return configured_image + + def build_image_by_path(self, dir, name=None): + try: + logging.info('Creating configured image...') + configured_image = self.client.images.build(path=dir, + tag=name, + rm=True, + forcerm=True) + logging.info('Created image with id: %s', configured_image[0].id) + self.images.append(configured_image) + return configured_image + except Exception as e: + logging.info(e) + raise + + def __enter__(self): + """ + Allocate ephemeral cluster resources. + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Clean up ephemeral cluster resources + """ + + # Clean up containers + for container in self.containers.values(): + logging.info('Cleaning up container: %s', container.name) + container.remove(v=True, force=True) + + # Clean up images + for image in reversed(self.images): + logging.info('Cleaning up image: %s', image[0].id) + self.client.images.remove(image[0].id, force=True) + + # Clean up network + if self.network is not None: + logging.info('Cleaning up network network: %s', self.network.name) + self.network.remove() + + # Clean up tmp files + for tmp_file in self.tmp_files: + os.remove(tmp_file) diff --git a/docker/test/integration/minifi/core/__init__.py b/docker/test/integration/minifi/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py new file mode 100644 index 0000000..c7a9e0d --- /dev/null +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -0,0 +1,111 @@ +import uuid +import yaml + +from ..core.Processor import Processor +from ..core.InputPort import InputPort + +class Minifi_flow_yaml_serializer: + def serialize(self, connectable, root=None, visited=None): + if visited is None: + visited = [] + + if root is None: + res = { + 'Flow Controller': { + 'name': 'MiNiFi Flow' + }, + 'Processors': [], + 'Connections': [], + 'Remote Processing Groups': [], + 'Controller Services': [] + } + else: + res = root + + visited.append(connectable) + + if hasattr(connectable, 'name'): + connectable_name = connectable.name + else: + connectable_name = str(connectable.uuid) + + if isinstance(connectable, InputPort): + group = connectable.remote_process_group + res_group = None + + for res_group_candidate in res['Remote Processing Groups']: + assert isinstance(res_group_candidate, dict) + if res_group_candidate['id'] == str(group.uuid): + res_group = res_group_candidate + + if res_group is None: + res_group = { + 'name': group.name, + 'id': str(group.uuid), + 'url': group.url, + 'timeout': '30 sec', + 'yield period': '3 sec', + 'Input Ports': [] + } + + res['Remote Processing Groups'].append(res_group) + + res_group['Input Ports'].append({ + 'id': str(connectable.uuid), + 'name': connectable.name, + 'max concurrent tasks': 1, + 'Properties': {} + }) + + if isinstance(connectable, Processor): + res['Processors'].append({ + 'name': connectable_name, + 'id': str(connectable.uuid), + 'class': 'org.apache.nifi.processors.standard.' + connectable.clazz, + 'scheduling strategy': connectable.schedule['scheduling strategy'], + 'scheduling period': connectable.schedule['scheduling period'], + 'penalization period': connectable.schedule['penalization period'], + 'yield period': connectable.schedule['yield period'], + 'run duration nanos': connectable.schedule['run duration nanos'], + 'Properties': connectable.properties, + 'auto-terminated relationships list': connectable.auto_terminate + }) + + for svc in connectable.controller_services: + if svc in visited: + continue + + visited.append(svc) + res['Controller Services'].append({ + 'name': svc.name, + 'id': svc.id, + 'class': svc.service_class, + 'Properties': svc.properties + }) + + for conn_name in connectable.connections: + conn_procs = connectable.connections[conn_name] + + if isinstance(conn_procs, list): + for proc in conn_procs: + res['Connections'].append({ + 'name': str(uuid.uuid4()), + 'source id': str(connectable.uuid), + 'source relationship name': conn_name, + 'destination id': str(proc.uuid), + 'drop empty': ("true" if proc.drop_empty_flowfiles else "false") + }) + if proc not in visited: + self.serialize(proc, res, visited) + else: + res['Connections'].append({ + 'name': str(uuid.uuid4()), + 'source id': str(connectable.uuid), + 'source relationship name': conn_name, + 'destination id': str(conn_procs.uuid) + }) + if conn_procs not in visited: + self.serialize(conn_procs, res, visited) + + if root is None: + return yaml.dump(res, default_flow_style=False) diff --git a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py new file mode 100644 index 0000000..22fa098 --- /dev/null +++ b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py @@ -0,0 +1,324 @@ +import uuid + +import xml.etree.cElementTree as elementTree +from xml.etree.cElementTree import Element + +from ..core.Processor import Processor +from ..core.InputPort import InputPort + +class Nifi_flow_xml_serializer: + def serialize(self, connectable, nifi_version=None, root=None, visited=None): + if visited is None: + visited = [] + + position = Element('position') + position.set('x', '0.0') + position.set('y', '0.0') + + comment = Element('comment') + styles = Element('styles') + bend_points = Element('bendPoints') + label_index = Element('labelIndex') + label_index.text = '1' + z_index = Element('zIndex') + z_index.text = '0' + + if root is None: + res = Element('flowController') + max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount') + max_timer_driven_thread_count.text = '10' + res.append(max_timer_driven_thread_count) + max_event_driven_thread_count = Element('maxEventDrivenThreadCount') + max_event_driven_thread_count.text = '5' + res.append(max_event_driven_thread_count) + root_group = Element('rootGroup') + root_group_id = Element('id') + root_group_id_text = str(uuid.uuid4()) + root_group_id.text = root_group_id_text + root_group.append(root_group_id) + root_group_name = Element('name') + root_group_name.text = root_group_id_text + root_group.append(root_group_name) + res.append(root_group) + root_group.append(position) + root_group.append(comment) + res.append(Element('controllerServices')) + res.append(Element('reportingTasks')) + res.set('encoding-version', '1.2') + else: + res = root + + visited.append(connectable) + + if hasattr(connectable, 'name'): + connectable_name_text = connectable.name + else: + connectable_name_text = str(connectable.uuid) + + if isinstance(connectable, InputPort): + input_port = Element('inputPort') + + input_port_id = Element('id') + input_port_id.text = str(connectable.uuid) + input_port.append(input_port_id) + + input_port_name = Element('name') + input_port_name.text = connectable_name_text + input_port.append(input_port_name) + + input_port.append(position) + input_port.append(comment) + + input_port_scheduled_state = Element('scheduledState') + input_port_scheduled_state.text = 'RUNNING' + input_port.append(input_port_scheduled_state) + + input_port_max_concurrent_tasks = Element('maxConcurrentTasks') + input_port_max_concurrent_tasks.text = '1' + input_port.append(input_port_max_concurrent_tasks) + next( res.iterfind('rootGroup') ).append(input_port) + + if isinstance(connectable, Processor): + conn_destination = Element('processor') + + proc_id = Element('id') + proc_id.text = str(connectable.uuid) + conn_destination.append(proc_id) + + proc_name = Element('name') + proc_name.text = connectable_name_text + conn_destination.append(proc_name) + + conn_destination.append(position) + conn_destination.append(styles) + conn_destination.append(comment) + + proc_class = Element('class') + proc_class.text = 'org.apache.nifi.processors.standard.' + connectable.clazz + conn_destination.append(proc_class) + + proc_bundle = Element('bundle') + proc_bundle_group = Element('group') + proc_bundle_group.text = 'org.apache.nifi' + proc_bundle.append(proc_bundle_group) + proc_bundle_artifact = Element('artifact') + proc_bundle_artifact.text = 'nifi-standard-nar' + proc_bundle.append(proc_bundle_artifact) + proc_bundle_version = Element('version') + proc_bundle_version.text = nifi_version + proc_bundle.append(proc_bundle_version) + conn_destination.append(proc_bundle) + + proc_max_concurrent_tasks = Element('maxConcurrentTasks') + proc_max_concurrent_tasks.text = '1' + conn_destination.append(proc_max_concurrent_tasks) + + proc_scheduling_period = Element('schedulingPeriod') + proc_scheduling_period.text = connectable.schedule['scheduling period'] + conn_destination.append(proc_scheduling_period) + + proc_penalization_period = Element('penalizationPeriod') + proc_penalization_period.text = connectable.schedule['penalization period'] + conn_destination.append(proc_penalization_period) + + proc_yield_period = Element('yieldPeriod') + proc_yield_period.text = connectable.schedule['yield period'] + conn_destination.append(proc_yield_period) + + proc_bulletin_level = Element('bulletinLevel') + proc_bulletin_level.text = 'WARN' + conn_destination.append(proc_bulletin_level) + + proc_loss_tolerant = Element('lossTolerant') + proc_loss_tolerant.text = 'false' + conn_destination.append(proc_loss_tolerant) + + proc_scheduled_state = Element('scheduledState') + proc_scheduled_state.text = 'RUNNING' + conn_destination.append(proc_scheduled_state) + + proc_scheduling_strategy = Element('schedulingStrategy') + proc_scheduling_strategy.text = connectable.schedule['scheduling strategy'] + conn_destination.append(proc_scheduling_strategy) + + proc_execution_node = Element('executionNode') + proc_execution_node.text = 'ALL' + conn_destination.append(proc_execution_node) + + proc_run_duration_nanos = Element('runDurationNanos') + proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos']) + conn_destination.append(proc_run_duration_nanos) + + for property_key, property_value in connectable.properties.items(): + proc_property = Element('property') + proc_property_name = Element('name') + proc_property_name.text = connectable.nifi_property_key(property_key) + if not proc_property_name.text: + continue + proc_property.append(proc_property_name) + proc_property_value = Element('value') + proc_property_value.text = property_value + proc_property.append(proc_property_value) + conn_destination.append(proc_property) + + for auto_terminate_rel in connectable.auto_terminate: + proc_auto_terminated_relationship = Element('autoTerminatedRelationship') + proc_auto_terminated_relationship.text = auto_terminate_rel + conn_destination.append(proc_auto_terminated_relationship) + next( res.iterfind('rootGroup') ).append(conn_destination) + """ res.iterfind('rootGroup').next().append(conn_destination) """ + + for svc in connectable.controller_services: + if svc in visited: + continue + + visited.append(svc) + controller_service = Element('controllerService') + + controller_service_id = Element('id') + controller_service_id.text = str(svc.id) + controller_service.append(controller_service_id) + + controller_service_name = Element('name') + controller_service_name.text = svc.name + controller_service.append(controller_service_name) + + controller_service.append(comment) + + controller_service_class = Element('class') + controller_service_class.text = svc.service_class, + controller_service.append(controller_service_class) + + controller_service_bundle = Element('bundle') + controller_service_bundle_group = Element('group') + controller_service_bundle_group.text = svc.group + controller_service_bundle.append(controller_service_bundle_group) + controller_service_bundle_artifact = Element('artifact') + controller_service_bundle_artifact.text = svc.artifact + controller_service_bundle.append(controller_service_bundle_artifact) + controller_service_bundle_version = Element('version') + controller_service_bundle_version.text = nifi_version + controller_service_bundle.append(controller_service_bundle_version) + controller_service.append(controller_service_bundle) + + controller_enabled = Element('enabled') + controller_enabled.text = 'true', + controller_service.append(controller_enabled) + + for property_key, property_value in svc.properties: + controller_service_property = Element('property') + controller_service_property_name = Element('name') + controller_service_property_name.text = property_key + controller_service_property.append(controller_service_property_name) + controller_service_property_value = Element('value') + controller_service_property_value.text = property_value + controller_service_property.append(controller_service_property_value) + controller_service.append(controller_service_property) + next( res.iterfind('rootGroup') ).append(controller_service) + """ res.iterfind('rootGroup').next().append(controller_service)""" + + for conn_name in connectable.connections: + conn_destinations = connectable.connections[conn_name] + + if isinstance(conn_destinations, list): + for conn_destination in conn_destinations: + connection = self.build_nifi_flow_xml_connection_element(res, + bend_points, + conn_name, + connectable, + label_index, + conn_destination, + z_index) + next( res.iterfind('rootGroup') ).append(connection) + """ res.iterfind('rootGroup').next().append(connection) """ + + if conn_destination not in visited: + self.serialize(conn_destination, nifi_version, res, visited) + else: + connection = self.build_nifi_flow_xml_connection_element(res, + bend_points, + conn_name, + connectable, + label_index, + conn_destinations, + z_index) + next( res.iterfind('rootGroup') ).append(connection) + """ res.iterfind('rootGroup').next().append(connection) """ + + if conn_destinations not in visited: + self.serialize(conn_destinations, nifi_version, res, visited) + + if root is None: + return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>' + + "\n" + + elementTree.tostring(res, encoding='utf-8').decode('utf-8')) + + def build_nifi_flow_xml_connection_element(self, res, bend_points, conn_name, connectable, label_index, destination, z_index): + connection = Element('connection') + + connection_id = Element('id') + connection_id.text = str(uuid.uuid4()) + connection.append(connection_id) + + connection_name = Element('name') + connection.append(connection_name) + + connection.append(bend_points) + connection.append(label_index) + connection.append(z_index) + + connection_source_id = Element('sourceId') + connection_source_id.text = str(connectable.uuid) + connection.append(connection_source_id) + + connection_source_group_id = Element('sourceGroupId') + connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text + """connection_source_group_id.text = res.iterfind('rootGroup/id').next().text""" + connection.append(connection_source_group_id) + + connection_source_type = Element('sourceType') + if isinstance(connectable, Processor): + connection_source_type.text = 'PROCESSOR' + elif isinstance(connectable, InputPort): + connection_source_type.text = 'INPUT_PORT' + else: + raise Exception('Unexpected source type: %s' % type(connectable)) + connection.append(connection_source_type) + + connection_destination_id = Element('destinationId') + connection_destination_id.text = str(destination.uuid) + connection.append(connection_destination_id) + + connection_destination_group_id = Element('destinationGroupId') + connection_destination_group_id.text = next(res.iterfind('rootGroup/id')).text + """ connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text """ + connection.append(connection_destination_group_id) + + connection_destination_type = Element('destinationType') + if isinstance(destination, Processor): + connection_destination_type.text = 'PROCESSOR' + elif isinstance(destination, InputPort): + connection_destination_type.text = 'INPUT_PORT' + else: + raise Exception('Unexpected destination type: %s' % type(destination)) + connection.append(connection_destination_type) + + connection_relationship = Element('relationship') + if not isinstance(connectable, InputPort): + connection_relationship.text = conn_name + connection.append(connection_relationship) + + connection_max_work_queue_size = Element('maxWorkQueueSize') + connection_max_work_queue_size.text = '10000' + connection.append(connection_max_work_queue_size) + + connection_max_work_queue_data_size = Element('maxWorkQueueDataSize') + connection_max_work_queue_data_size.text = '1 GB' + connection.append(connection_max_work_queue_data_size) + + connection_flow_file_expiration = Element('flowFileExpiration') + connection_flow_file_expiration.text = '0 sec' + connection.append(connection_flow_file_expiration) + + return connection + diff --git a/docker/test/integration/minifi/flow_serialization/__init__.py b/docker/test/integration/minifi/flow_serialization/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docker/test/integration/minifi/processors/DeleteS3Object.py b/docker/test/integration/minifi/processors/DeleteS3Object.py new file mode 100644 index 0000000..003a9cd --- /dev/null +++ b/docker/test/integration/minifi/processors/DeleteS3Object.py @@ -0,0 +1,22 @@ +from ..core.Processor import Processor + +class DeleteS3Object(Processor): + def __init__(self, + proxy_host = '', + proxy_port = '', + proxy_username = '', + proxy_password = ''): + super(DeleteS3Object, self).__init__('DeleteS3Object', + properties = { + 'Object Key': 'test_object_key', + 'Bucket': 'test_bucket', + 'Access Key': 'test_access_key', + 'Secret Key': 'test_secret', + 'Endpoint Override URL': "http://s3-server:9090", + 'Proxy Host': proxy_host, + 'Proxy Port': proxy_port, + 'Proxy Username': proxy_username, + 'Proxy Password': proxy_password, + }, + auto_terminate=['success']) + diff --git a/docker/test/integration/minifi/processors/GenerateFlowFile.py b/docker/test/integration/minifi/processors/GenerateFlowFile.py new file mode 100644 index 0000000..93d42ca --- /dev/null +++ b/docker/test/integration/minifi/processors/GenerateFlowFile.py @@ -0,0 +1,8 @@ +from ..core.Processor import Processor + +class GenerateFlowFile(Processor): + def __init__(self, file_size, schedule={'scheduling period': '0 sec'}): + super(GenerateFlowFile, self).__init__('GenerateFlowFile', + properties={'File Size': file_size}, + schedule=schedule, + auto_terminate=['success']) diff --git a/docker/test/integration/minifi/processors/GetFile.py b/docker/test/integration/minifi/processors/GetFile.py new file mode 100644 index 0000000..29b8180 --- /dev/null +++ b/docker/test/integration/minifi/processors/GetFile.py @@ -0,0 +1,8 @@ +from ..core.Processor import Processor + +class GetFile(Processor): + def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}): + super(GetFile, self).__init__('GetFile', + properties={'Input Directory': input_dir, 'Keep Source File': 'true'}, + schedule=schedule, + auto_terminate=['success']) diff --git a/docker/test/integration/minifi/processors/InvokeHTTP.py b/docker/test/integration/minifi/processors/InvokeHTTP.py new file mode 100644 index 0000000..135c8d5 --- /dev/null +++ b/docker/test/integration/minifi/processors/InvokeHTTP.py @@ -0,0 +1,30 @@ +from ..core.Processor import Processor + +class InvokeHTTP(Processor): + def __init__(self, url, + method='GET', + proxy_host='', + proxy_port='', + proxy_username='', + proxy_password='', + ssl_context_service=None, + schedule={'scheduling strategy': 'EVENT_DRIVEN'}): + properties = { + 'Remote URL': url, + 'HTTP Method': method, + 'Proxy Host': proxy_host, + 'Proxy Port': proxy_port, + 'invokehttp-proxy-username': proxy_username, + 'invokehttp-proxy-password': proxy_password } + + controller_services = [] + + if ssl_context_service is not None: + properties['SSL Context Service'] = ssl_context_service.name + controller_services.append(ssl_context_service) + + super(InvokeHTTP, self).__init__('InvokeHTTP', + properties = properties, + controller_services = controller_services, + auto_terminate = ['success', 'response', 'retry', 'failure', 'no retry'], + schedule = schedule) diff --git a/docker/test/integration/minifi/processors/ListenHTTP.py b/docker/test/integration/minifi/processors/ListenHTTP.py new file mode 100644 index 0000000..6f5bca1 --- /dev/null +++ b/docker/test/integration/minifi/processors/ListenHTTP.py @@ -0,0 +1,14 @@ +from ..core.Processor import Processor + +class ListenHTTP(Processor): + def __init__(self, port, cert=None, schedule=None): + properties = {'Listening Port': port} + + if cert is not None: + properties['SSL Certificate'] = cert + properties['SSL Verify Peer'] = 'no' + + super(ListenHTTP, self).__init__('ListenHTTP', + properties=properties, + auto_terminate=['success'], + schedule=schedule) diff --git a/docker/test/integration/minifi/processors/LogAttribute.py b/docker/test/integration/minifi/processors/LogAttribute.py new file mode 100644 index 0000000..1be4c38 --- /dev/null +++ b/docker/test/integration/minifi/processors/LogAttribute.py @@ -0,0 +1,7 @@ +from ..core.Processor import Processor + +class LogAttribute(Processor): + def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): + super(LogAttribute, self).__init__('LogAttribute', + auto_terminate=['success'], + schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PublishKafka.py b/docker/test/integration/minifi/processors/PublishKafka.py new file mode 100644 index 0000000..698cd74 --- /dev/null +++ b/docker/test/integration/minifi/processors/PublishKafka.py @@ -0,0 +1,10 @@ +from ..core.Processor import Processor + +class PublishKafka(Processor): + def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): + super(PublishKafka, self).__init__('PublishKafka', + properties={'Client Name': 'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test', + 'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1', + 'Request Timeout': '10 sec', 'Message Timeout': '12 sec'}, + auto_terminate=['success'], + schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PublishKafkaSSL.py b/docker/test/integration/minifi/processors/PublishKafkaSSL.py new file mode 100644 index 0000000..82c33f6 --- /dev/null +++ b/docker/test/integration/minifi/processors/PublishKafkaSSL.py @@ -0,0 +1,16 @@ +from ..core.Processor import Processor + +class PublishKafkaSSL(Processor): + def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): + super(PublishKafkaSSL, self).__init__('PublishKafka', + properties={'Client Name': 'LMN', 'Known Brokers': 'kafka-broker:9093', + 'Topic Name': 'test', 'Batch Size': '10', + 'Compress Codec': 'none', 'Delivery Guarantee': '1', + 'Request Timeout': '10 sec', 'Message Timeout': '12 sec', + 'Security CA': '/tmp/resources/certs/ca-cert', + 'Security Cert': '/tmp/resources/certs/client_LMN_client.pem', + 'Security Pass Phrase': 'abcdefgh', + 'Security Private Key': '/tmp/resources/certs/client_LMN_client.key', + 'Security Protocol': 'ssl'}, + auto_terminate=['success'], + schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PutFile.py b/docker/test/integration/minifi/processors/PutFile.py new file mode 100644 index 0000000..047d32d --- /dev/null +++ b/docker/test/integration/minifi/processors/PutFile.py @@ -0,0 +1,14 @@ +from ..core.Processor import Processor + +class PutFile(Processor): + def __init__(self, output_dir, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): + super(PutFile, self).__init__('PutFile', + properties={'Directory': output_dir, 'Directory Permissions': '777', 'Permissions': '777'}, + auto_terminate=['success', 'failure'], + schedule=schedule) + + def nifi_property_key(self, key): + if key == 'Directory Permissions': + return None + else: + return key diff --git a/docker/test/integration/minifi/processors/PutS3Object.py b/docker/test/integration/minifi/processors/PutS3Object.py new file mode 100644 index 0000000..74fb4a8 --- /dev/null +++ b/docker/test/integration/minifi/processors/PutS3Object.py @@ -0,0 +1,20 @@ +from ..core.Processor import Processor + +class PutS3Object(Processor): + def __init__(self, + proxy_host='', + proxy_port='', + proxy_username='', + proxy_password=''): + super(PutS3Object, self).__init__('PutS3Object', + properties = { + 'Object Key': 'test_object_key', + 'Bucket': 'test_bucket', + 'Access Key': 'test_access_key', + 'Secret Key': 'test_secret', + 'Endpoint Override URL': "http://s3-server:9090", + 'Proxy Host': proxy_host, + 'Proxy Port': proxy_port, + 'Proxy Username': proxy_username, + 'Proxy Password': proxy_password }, + auto_terminate = ['success']) diff --git a/docker/test/integration/minifi/processors/__init__.py b/docker/test/integration/minifi/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py new file mode 100644 index 0000000..689cf7d --- /dev/null +++ b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py @@ -0,0 +1,27 @@ +import logging +import os + +from os import listdir + +from .FileOutputValidator import FileOutputValidator + +class EmptyFilesOutPutValidator(FileOutputValidator): + + """ + Validates if all the files in the target directory are empty and at least one exists + """ + def __init__(self): + self.valid = False + + def validate(self, dir=''): + + if self.valid: + return True + + full_dir = self.output_dir + dir + logging.info("Output folder: %s", full_dir) + listing = listdir(full_dir) + if listing: + self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 for x in listing) + + return self.valid diff --git a/docker/test/integration/minifi/validators/FileOutputValidator.py b/docker/test/integration/minifi/validators/FileOutputValidator.py new file mode 100644 index 0000000..d558c43 --- /dev/null +++ b/docker/test/integration/minifi/validators/FileOutputValidator.py @@ -0,0 +1,8 @@ +from .OutputValidator import OutputValidator + +class FileOutputValidator(OutputValidator): + def set_output_dir(self, output_dir): + self.output_dir = output_dir + + def validate(self, dir=''): + pass diff --git a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py new file mode 100644 index 0000000..f60a008 --- /dev/null +++ b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py @@ -0,0 +1,25 @@ +import logging + +from os import listdir + +from .FileOutputValidator import FileOutputValidator + +class NoFileOutPutValidator(FileOutputValidator): + """ + Validates if no flowfiles were transferred + """ + def __init__(self): + self.valid = False + + def validate(self, dir=''): + + if self.valid: + return True + + full_dir = self.output_dir + dir + logging.info("Output folder: %s", full_dir) + listing = listdir(full_dir) + + self.valid = not bool(listing) + + return self.valid diff --git a/docker/test/integration/minifi/validators/OutputValidator.py b/docker/test/integration/minifi/validators/OutputValidator.py new file mode 100644 index 0000000..c05d5fa --- /dev/null +++ b/docker/test/integration/minifi/validators/OutputValidator.py @@ -0,0 +1,12 @@ +class OutputValidator(object): + """ + Base output validator class. Validators must implement + method validate, which returns a boolean. + """ + + def validate(self): + """ + Return True if output is valid; False otherwise. + """ + raise NotImplementedError("validate function needs to be implemented for validators") + diff --git a/docker/test/integration/minifi/validators/SegfaultValidator.py b/docker/test/integration/minifi/validators/SegfaultValidator.py new file mode 100644 index 0000000..ee0227d --- /dev/null +++ b/docker/test/integration/minifi/validators/SegfaultValidator.py @@ -0,0 +1,8 @@ +from .OutputValidator import OutputValidator + +class SegfaultValidator(OutputValidator): + """ + Validate that a file was received. + """ + def validate(self): + return True diff --git a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py new file mode 100644 index 0000000..7466b41 --- /dev/null +++ b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py @@ -0,0 +1,44 @@ +import logging +import os + +from os import listdir +from os.path import join + +from .FileOutputValidator import FileOutputValidator + +class SingleFileOutputValidator(FileOutputValidator): + """ + Validates the content of a single file in the given directory. + """ + + def __init__(self, expected_content, subdir=''): + self.valid = False + self.expected_content = expected_content + self.subdir = subdir + + def validate(self): + self.valid = False + full_dir = os.path.join(self.output_dir, self.subdir) + logging.info("Output folder: %s", full_dir) + + if not os.path.isdir(full_dir): + return self.valid + + listing = listdir(full_dir) + if listing: + for l in listing: + logging.info("name:: %s", l) + out_file_name = listing[0] + full_path = join(full_dir, out_file_name) + if not os.path.isfile(full_path): + return self.valid + + with open(full_path, 'r') as out_file: + contents = out_file.read() + logging.info("dir %s -- name %s", full_dir, out_file_name) + logging.info("expected %s -- content %s", self.expected_content, contents) + + if self.expected_content in contents: + self.valid = True + + return self.valid diff --git a/docker/test/integration/minifi/validators/__init__.py b/docker/test/integration/minifi/validators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py index 3ae4ff5..5b48e6e 100644 --- a/docker/test/integration/test_filesystem_ops.py +++ b/docker/test/integration/test_filesystem_ops.py @@ -14,8 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * - def test_get_put(): """ diff --git a/docker/test/integration/test_filter_zero_file.py b/docker/test/integration/test_filter_zero_file.py index b56b23f..918582b 100644 --- a/docker/test/integration/test_filter_zero_file.py +++ b/docker/test/integration/test_filter_zero_file.py @@ -14,7 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * def test_filter_zero_file(): """ diff --git a/docker/test/integration/test_hash_content.py b/docker/test/integration/test_hash_content.py index 1482014..e60d3a3 100644 --- a/docker/test/integration/test_hash_content.py +++ b/docker/test/integration/test_hash_content.py @@ -14,8 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * - def test_hash_invoke(): """ diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py index facc45c..f9431d4 100644 --- a/docker/test/integration/test_http.py +++ b/docker/test/integration/test_http.py @@ -14,8 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * - def test_invoke_listen(): """ diff --git a/docker/test/integration/test_rdkafka.py b/docker/test/integration/test_rdkafka.py index 980e66f..bea36db 100644 --- a/docker/test/integration/test_rdkafka.py +++ b/docker/test/integration/test_rdkafka.py @@ -14,8 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * - def test_publish_kafka(): """ diff --git a/docker/test/integration/test_s2s.py b/docker/test/integration/test_s2s.py index 3125139..ff1a312 100644 --- a/docker/test/integration/test_s2s.py +++ b/docker/test/integration/test_s2s.py @@ -14,8 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * - def test_minifi_to_nifi(): """ diff --git a/docker/test/integration/test_s3.py b/docker/test/integration/test_s3.py index d961003..75b361a 100644 --- a/docker/test/integration/test_s3.py +++ b/docker/test/integration/test_s3.py @@ -14,8 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * - def test_put_s3_object(): """ diff --git a/docker/test/integration/test_zero_file.py b/docker/test/integration/test_zero_file.py index 3223329..1a0cf05 100644 --- a/docker/test/integration/test_zero_file.py +++ b/docker/test/integration/test_zero_file.py @@ -14,7 +14,6 @@ # limitations under the License. from minifi import * -from minifi.test import * def test_zero_file(): """ diff --git a/docker/test/test_https.py b/docker/test/test_https.py index 79a565f..2ea1bed 100644 --- a/docker/test/test_https.py +++ b/docker/test/test_https.py @@ -18,8 +18,6 @@ import time from M2Crypto import X509, EVP, RSA, ASN1 from minifi import * -from minifi.test import * - def callback(): pass