This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ce3d1f8  MINIFICPP-1445 - Refactor docker integration test frame
ce3d1f8 is described below

commit ce3d1f8bdc2d2236f471ee960ab2ee05c1fe927f
Author: Adam Hunyadi <hunyadi....@gmail.com>
AuthorDate: Mon Jan 4 17:18:10 2021 +0100

    MINIFICPP-1445 - Refactor docker integration test frame
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #968
---
 .gitignore                                         |    1 +
 docker/test/integration/minifi/__init__.py         | 1105 +-------------------
 docker/test/integration/minifi/core/Cluster.py     |   22 +
 docker/test/integration/minifi/core/Connectable.py |   77 ++
 .../integration/minifi/core/ControllerService.py   |   17 +
 .../__init__.py => core/DockerTestCluster.py}      |  178 +---
 docker/test/integration/minifi/core/InputPort.py   |    7 +
 .../integration/minifi/core/OutputEventHandler.py  |   24 +
 docker/test/integration/minifi/core/Processor.py   |   45 +
 .../integration/minifi/core/RemoteProcessGroup.py  |   13 +
 .../integration/minifi/core/SSLContextService.py   |   16 +
 .../minifi/core/SingleNodeDockerCluster.py         |  319 ++++++
 docker/test/integration/minifi/core/__init__.py    |    0
 .../Minifi_flow_yaml_serializer.py                 |  111 ++
 .../flow_serialization/Nifi_flow_xml_serializer.py |  324 ++++++
 .../minifi/flow_serialization/__init__.py          |    0
 .../minifi/processors/DeleteS3Object.py            |   22 +
 .../minifi/processors/GenerateFlowFile.py          |    8 +
 .../test/integration/minifi/processors/GetFile.py  |    8 +
 .../integration/minifi/processors/InvokeHTTP.py    |   30 +
 .../integration/minifi/processors/ListenHTTP.py    |   14 +
 .../integration/minifi/processors/LogAttribute.py  |    7 +
 .../integration/minifi/processors/PublishKafka.py  |   10 +
 .../minifi/processors/PublishKafkaSSL.py           |   16 +
 .../test/integration/minifi/processors/PutFile.py  |   14 +
 .../integration/minifi/processors/PutS3Object.py   |   20 +
 .../test/integration/minifi/processors/__init__.py |    0
 .../minifi/validators/EmptyFilesOutPutValidator.py |   27 +
 .../minifi/validators/FileOutputValidator.py       |    8 +
 .../minifi/validators/NoFileOutPutValidator.py     |   25 +
 .../minifi/validators/OutputValidator.py           |   12 +
 .../minifi/validators/SegfaultValidator.py         |    8 +
 .../minifi/validators/SingleFileOutputValidator.py |   44 +
 .../test/integration/minifi/validators/__init__.py |    0
 docker/test/integration/test_filesystem_ops.py     |    2 -
 docker/test/integration/test_filter_zero_file.py   |    1 -
 docker/test/integration/test_hash_content.py       |    2 -
 docker/test/integration/test_http.py               |    2 -
 docker/test/integration/test_rdkafka.py            |    2 -
 docker/test/integration/test_s2s.py                |    2 -
 docker/test/integration/test_s3.py                 |    2 -
 docker/test/integration/test_zero_file.py          |    1 -
 docker/test/test_https.py                          |    2 -
 43 files changed, 1298 insertions(+), 1250 deletions(-)

diff --git a/.gitignore b/.gitignore
index 46f1e22..cc40bfd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -57,6 +57,7 @@ libminifi/src/agent/agent_version.cpp
 docs/generated
 thirdparty/apache-rat/apache-rat*
 /compile_commands.json
+__pycache__/
 
 # Ignore source files that have been placed in the docker directory during 
build
 docker/minificppsource
diff --git a/docker/test/integration/minifi/__init__.py 
b/docker/test/integration/minifi/__init__.py
index b64d541..fead6a1 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -30,1076 +30,39 @@ from copy import copy
 import time
 from collections import OrderedDict
 
-class Cluster(object):
-    """
-    Base Cluster class. This is intended to be a generic interface
-    to different types of clusters. Clusters could be Kubernetes clusters,
-    Docker swarms, or cloud compute/container services.
-    """
+from .core.Connectable import Connectable
+from .core.Cluster import Cluster
+from .core.Connectable import Connectable
+from .core.ControllerService import ControllerService
+from .core.InputPort import InputPort
+from .core.Processor import Processor
+from .core.RemoteProcessGroup import RemoteProcessGroup
+from .core.SingleNodeDockerCluster import SingleNodeDockerCluster
+from .core.SSLContextService import SSLContextService
+from .core.DockerTestCluster import DockerTestCluster
+from .core.OutputEventHandler import OutputEventHandler
+
+from .flow_serialization.Minifi_flow_yaml_serializer import 
Minifi_flow_yaml_serializer
+from .flow_serialization.Nifi_flow_xml_serializer import 
Nifi_flow_xml_serializer
+
+from .processors.GenerateFlowFile import GenerateFlowFile
+from .processors.GetFile import GetFile
+from .processors.InvokeHTTP import InvokeHTTP
+from .processors.ListenHTTP import ListenHTTP
+from .processors.LogAttribute import LogAttribute
+from .processors.PublishKafka import PublishKafka
+from .processors.PublishKafkaSSL import PublishKafkaSSL
+from .processors.PutFile import PutFile
+from .processors.PutS3Object import PutS3Object
+from .processors.DeleteS3Object import DeleteS3Object
+
+from .validators.OutputValidator import OutputValidator
+from .validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
+from .validators.SegfaultValidator import SegfaultValidator
+from .validators.NoFileOutPutValidator import NoFileOutPutValidator
+from .validators.SingleFileOutputValidator import SingleFileOutputValidator
+from .validators.FileOutputValidator import FileOutputValidator
+
+logging.basicConfig(level=logging.DEBUG)
 
-    def deploy_flow(self, flow, name=None, vols=None):
-        """
-        Deploys a flow to the cluster.
-        """
 
-    def __enter__(self):
-        """
-        Allocate ephemeral cluster resources.
-        """
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral cluster resources.
-        """
-
-
-class SingleNodeDockerCluster(Cluster):
-    """
-    A "cluster" which consists of a single docker node. Useful for
-    testing or use-cases which do not span multiple compute nodes.
-    """
-
-    def __init__(self):
-        self.minifi_version = os.environ['MINIFI_VERSION']
-        self.nifi_version = '1.7.0'
-        self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
-        self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
-        self.network = None
-        self.containers = OrderedDict()
-        self.images = []
-        self.tmp_files = []
-
-        # Get docker client
-        self.client = docker.from_env()
-
-    def deploy_flow(self,
-                    flow,
-                    name=None,
-                    vols=None,
-                    engine='minifi-cpp'):
-        """
-        Compiles the flow to a valid config file and overlays it into a new 
image.
-        """
-
-        if vols is None:
-            vols = {}
-
-        logging.info('Deploying %s flow...%s', engine,name)
-
-        if name is None:
-            name = engine + '-' + str(uuid.uuid4())
-            logging.info('Flow name was not provided; using generated name 
\'%s\'', name)
-
-        # Create network if necessary
-        if self.network is None:
-            net_name = 'nifi-' + str(uuid.uuid4())
-            logging.info('Creating network: %s', net_name)
-            self.network = self.client.networks.create(net_name)
-
-        if engine == 'nifi':
-            self.deploy_nifi_flow(flow, name, vols)
-        elif engine == 'minifi-cpp':
-            self.deploy_minifi_cpp_flow(flow, name, vols)
-        elif engine == 'kafka-broker':
-            self.deploy_kafka_broker(name)
-        elif engine == 'http-proxy':
-            self.deploy_http_proxy()
-        elif engine == 's3-server':
-            self.deploy_s3_server()
-        else:
-            raise Exception('invalid flow engine: \'%s\'' % engine)
-
-    def deploy_minifi_cpp_flow(self, flow, name, vols):
-
-        # Build configured image
-        dockerfile = dedent("""FROM {base_image}
-                USER root
-                ADD config.yml {minifi_root}/conf/config.yml
-                RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
-                USER minificpp
-                """.format(name=name,hostname=name,
-                           base_image='apacheminificpp:' + self.minifi_version,
-                           minifi_root=self.minifi_root))
-
-        test_flow_yaml = minifi_flow_yaml(flow)
-        logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
-
-        conf_file_buffer = BytesIO()
-
-        try:
-            conf_file_buffer.write(test_flow_yaml.encode('utf-8'))
-            conf_file_len = conf_file_buffer.tell()
-            conf_file_buffer.seek(0)
-
-            context_files = [
-                {
-                    'name': 'config.yml',
-                    'size': conf_file_len,
-                    'file_obj': conf_file_buffer
-                }
-            ]
-
-            configured_image = self.build_image(dockerfile, context_files)
-
-        finally:
-            conf_file_buffer.close()
-
-        logging.info('Creating and running docker container for flow...')
-
-        container = self.client.containers.run(
-                configured_image[0],
-                detach=True,
-                name=name,
-                network=self.network.name,
-                volumes=vols)
-
-        logging.info('Started container \'%s\'', container.name)
-
-        self.containers[container.name] = container
-
-    def deploy_nifi_flow(self, flow, name, vols):
-        dockerfile = dedent(r"""FROM {base_image}
-                USER root
-                ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
-                RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
-                RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' 
{nifi_root}/conf/nifi.properties
-                RUN sed -i -e 
's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' 
{nifi_root}/conf/nifi.properties
-                USER nifi
-                """.format(name=name,
-                           base_image='apache/nifi:' + self.nifi_version,
-                           nifi_root=self.nifi_root))
-
-        test_flow_xml = nifi_flow_xml(flow, self.nifi_version)
-        logging.info('Using generated flow config xml:\n%s', test_flow_xml)
-
-        conf_file_buffer = BytesIO()
-
-        try:
-            with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as 
conf_gz_file_buffer:
-                conf_gz_file_buffer.write(test_flow_xml.encode())
-            conf_file_len = conf_file_buffer.tell()
-            conf_file_buffer.seek(0)
-
-            context_files = [
-                {
-                    'name': 'flow.xml.gz',
-                    'size': conf_file_len,
-                    'file_obj': conf_file_buffer
-                }
-            ]
-
-            configured_image = self.build_image(dockerfile, context_files)
-
-        finally:
-            conf_file_buffer.close()
-
-        logging.info('Creating and running docker container for flow...')
-
-        container = self.client.containers.run(
-                configured_image[0],
-                detach=True,
-                name=name,
-                hostname=name,
-                network=self.network.name,
-                volumes=vols)
-
-        logging.info('Started container \'%s\'', container.name)
-
-        self.containers[container.name] = container
-
-    def deploy_kafka_broker(self, name):
-        logging.info('Creating and running docker containers for kafka 
broker...')
-        zookeeper = self.client.containers.run(
-                    self.client.images.pull("wurstmeister/zookeeper:latest"),
-                    detach=True,
-                    name='zookeeper',
-                    network=self.network.name,
-                    ports={'2181/tcp': 2181},
-                    )
-        self.containers[zookeeper.name] = zookeeper
-
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on 
DockerVerify.sh
-        broker_image = self.build_image_by_path(test_dir + 
"/resources/kafka_broker", 'minifi-kafka')
-        broker = self.client.containers.run(
-                    broker_image[0],
-                    detach=True,
-                    name='kafka-broker',
-                    network=self.network.name,
-                    ports={'9092/tcp': 9092},
-                    
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093",
 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
-                    )
-        self.containers[broker.name] = broker
-
-        dockerfile = dedent("""FROM {base_image}
-                USER root
-                CMD $KAFKA_HOME/bin/kafka-console-consumer.sh 
--bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
-                """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
-        configured_image = self.build_image(dockerfile, [])
-        consumer = self.client.containers.run(
-                    configured_image[0],
-                    detach=True,
-                    name='kafka-consumer',
-                    network=self.network.name,
-                    )
-        self.containers[consumer.name] = consumer
-
-    def deploy_http_proxy(self):
-        logging.info('Creating and running http-proxy docker container...')
-        dockerfile = dedent("""FROM {base_image}
-                RUN apt update && apt install -y apache2-utils
-                RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} 
{proxy_password}
-                RUN echo 'auth_param basic program 
/usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users'  > 
/etc/squid/squid.conf && \
-                    echo 'auth_param basic realm proxy' >> 
/etc/squid/squid.conf && \
-                    echo 'acl authenticated proxy_auth REQUIRED' >> 
/etc/squid/squid.conf && \
-                    echo 'http_access allow authenticated' >> 
/etc/squid/squid.conf && \
-                    echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
-                ENTRYPOINT ["/sbin/entrypoint.sh"]
-                """.format(base_image='sameersbn/squid:3.5.27-2', 
proxy_username='admin', proxy_password='test101', proxy_port='3128'))
-        configured_image = self.build_image(dockerfile, [])
-        consumer = self.client.containers.run(
-                    configured_image[0],
-                    detach=True,
-                    name='http-proxy',
-                    network=self.network.name,
-                    ports={'3128/tcp': 3128},
-                    )
-        self.containers[consumer.name] = consumer
-
-    def deploy_s3_server(self):
-        consumer = self.client.containers.run(
-                    "adobe/s3mock:2.1.28",
-                    detach=True,
-                    name='s3-server',
-                    network=self.network.name,
-                    ports={'9090/tcp': 9090, '9191/tcp': 9191},
-                    environment=["initialBuckets=test_bucket"],
-                    )
-        self.containers[consumer.name] = consumer
-
-    def build_image(self, dockerfile, context_files):
-        conf_dockerfile_buffer = BytesIO()
-        docker_context_buffer = BytesIO()
-
-        try:
-            # Overlay conf onto base nifi image
-            conf_dockerfile_buffer.write(dockerfile.encode())
-            conf_dockerfile_buffer.seek(0)
-
-            with tarfile.open(mode='w', fileobj=docker_context_buffer) as 
docker_context:
-                dockerfile_info = tarfile.TarInfo('Dockerfile')
-                dockerfile_info.size = 
conf_dockerfile_buffer.getbuffer().nbytes
-                docker_context.addfile(dockerfile_info,
-                                       fileobj=conf_dockerfile_buffer)
-
-                for context_file in context_files:
-                    file_info = tarfile.TarInfo(context_file['name'])
-                    file_info.size = context_file['size']
-                    docker_context.addfile(file_info,
-                                           fileobj=context_file['file_obj'])
-            docker_context_buffer.seek(0)
-
-            logging.info('Creating configured image...')
-            configured_image = 
self.client.images.build(fileobj=docker_context_buffer,
-                                                        custom_context=True,
-                                                        rm=True,
-                                                        forcerm=True)
-            logging.info('Created image with id: %s', configured_image[0].id)
-            self.images.append(configured_image)
-
-        finally:
-            conf_dockerfile_buffer.close()
-            docker_context_buffer.close()
-
-        return configured_image
-
-    def build_image_by_path(self, dir, name=None):
-        try:
-            logging.info('Creating configured image...')
-            configured_image = self.client.images.build(path=dir,
-                                                        tag=name,
-                                                        rm=True,
-                                                        forcerm=True)
-            logging.info('Created image with id: %s', configured_image[0].id)
-            self.images.append(configured_image)
-            return configured_image
-        except Exception as e:
-            logging.info(e)
-            raise
-
-    def __enter__(self):
-        """
-        Allocate ephemeral cluster resources.
-        """
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral cluster resources
-        """
-
-        # Clean up containers
-        for container in self.containers.values():
-            logging.info('Cleaning up container: %s', container.name)
-            container.remove(v=True, force=True)
-
-        # Clean up images
-        for image in reversed(self.images):
-            logging.info('Cleaning up image: %s', image[0].id)
-            self.client.images.remove(image[0].id, force=True)
-
-        # Clean up network
-        if self.network is not None:
-            logging.info('Cleaning up network network: %s', self.network.name)
-            self.network.remove()
-
-        # Clean up tmp files
-        for tmp_file in self.tmp_files:
-            os.remove(tmp_file)
-
-
-class Connectable(object):
-    def __init__(self,
-                 name=None,
-                 auto_terminate=None):
-
-        self.uuid = uuid.uuid4()
-
-        if name is None:
-            self.name = str(self.uuid)
-        else:
-            self.name = name
-
-        if auto_terminate is None:
-            self.auto_terminate = []
-        else:
-            self.auto_terminate = auto_terminate
-
-        self.connections = {}
-        self.out_proc = self
-
-        self.drop_empty_flowfiles = False
-
-    def connect(self, connections):
-        for rel in connections:
-
-            # Ensure that rel is not auto-terminated
-            if rel in self.auto_terminate:
-                del self.auto_terminate[self.auto_terminate.index(rel)]
-
-            # Add to set of output connections for this rel
-            if rel not in self.connections:
-                self.connections[rel] = []
-            self.connections[rel].append(connections[rel])
-
-        return self
-
-    def __rshift__(self, other):
-        """
-        Right shift operator to support flow DSL, for example:
-
-            GetFile('/input') >> LogAttribute() >> PutFile('/output')
-
-        """
-
-        connected = copy(self)
-        connected.connections = copy(self.connections)
-
-        if self.out_proc is self:
-            connected.out_proc = connected
-        else:
-            connected.out_proc = copy(connected.out_proc)
-
-        if isinstance(other, tuple):
-            if isinstance(other[0], tuple):
-                for rel_tuple in other:
-                    rel = {rel_tuple[0]: rel_tuple[1]}
-                    connected.out_proc.connect(rel)
-            else:
-                rel = {other[0]: other[1]}
-                connected.out_proc.connect(rel)
-        else:
-            connected.out_proc.connect({'success': other})
-            connected.out_proc = other
-
-        return connected
-
-    def __invert__(self):
-        """
-        Invert operation to set empty file filtering on incoming connections
-        GetFile('/input') >> ~LogAttribute()
-        """
-        self.drop_empty_flowfiles = True
-
-        return self
-
-
-class Processor(Connectable):
-    def __init__(self,
-                 clazz,
-                 properties=None,
-                 schedule=None,
-                 name=None,
-                 controller_services=None,
-                 auto_terminate=None):
-
-        super(Processor, self).__init__(name=name,
-                                        auto_terminate=auto_terminate)
-
-        if controller_services is None:
-            controller_services = []
-
-        if schedule is None:
-            schedule = {}
-
-        if properties is None:
-            properties = {}
-
-        if name is None:
-            pass
-
-        self.clazz = clazz
-        self.properties = properties
-        self.controller_services = controller_services
-
-        self.schedule = {
-            'scheduling strategy': 'TIMER_DRIVEN',
-            'scheduling period': '1 sec',
-            'penalization period': '30 sec',
-            'yield period': '1 sec',
-            'run duration nanos': 0
-        }
-        self.schedule.update(schedule)
-
-    def nifi_property_key(self, key):
-        """
-        Returns the Apache NiFi-equivalent property key for the given key. 
This is often, but not always, the same as
-        the internal key.
-        """
-        return key
-
-
-class InvokeHTTP(Processor):
-    def __init__(self, url,
-                 method='GET',
-                 proxy_host='',
-                 proxy_port='',
-                 proxy_username='',
-                 proxy_password='',
-                 ssl_context_service=None,
-                 schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        properties = {'Remote URL': url,
-                      'HTTP Method': method,
-                      'Proxy Host': proxy_host,
-                      'Proxy Port': proxy_port,
-                      'invokehttp-proxy-username': proxy_username,
-                      'invokehttp-proxy-password': proxy_password}
-
-        controller_services = []
-
-        if ssl_context_service is not None:
-            properties['SSL Context Service'] = ssl_context_service.name
-            controller_services.append(ssl_context_service)
-
-        super(InvokeHTTP, self).__init__('InvokeHTTP',
-                                         properties=properties,
-                                         
controller_services=controller_services,
-                                         auto_terminate=['success',
-                                                         'response',
-                                                         'retry',
-                                                         'failure',
-                                                         'no retry'],
-                                         schedule=schedule)
-
-
-class ListenHTTP(Processor):
-    def __init__(self, port, cert=None, schedule=None):
-        properties = {'Listening Port': port}
-
-        if cert is not None:
-            properties['SSL Certificate'] = cert
-            properties['SSL Verify Peer'] = 'no'
-
-        super(ListenHTTP, self).__init__('ListenHTTP',
-                                         properties=properties,
-                                         auto_terminate=['success'],
-                                         schedule=schedule)
-
-
-class LogAttribute(Processor):
-    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(LogAttribute, self).__init__('LogAttribute',
-                                           auto_terminate=['success'],
-                                           schedule=schedule)
-
-
-class GetFile(Processor):
-    def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}):
-        super(GetFile, self).__init__('GetFile',
-                                      properties={'Input Directory': 
input_dir, 'Keep Source File': 'true'},
-                                      schedule=schedule,
-                                      auto_terminate=['success'])
-
-
-class GenerateFlowFile(Processor):
-    def __init__(self, file_size, schedule={'scheduling period': '0 sec'}):
-        super(GenerateFlowFile, self).__init__('GenerateFlowFile',
-                                      properties={'File Size': file_size},
-                                      schedule=schedule,
-                                      auto_terminate=['success'])
-
-
-class PutFile(Processor):
-    def __init__(self, output_dir, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(PutFile, self).__init__('PutFile',
-                                      properties={'Directory': output_dir, 
'Directory Permissions': '777', 'Permissions': '777'},
-                                      auto_terminate=['success', 'failure'],
-                                      schedule=schedule)
-
-    def nifi_property_key(self, key):
-        if key == 'Directory Permissions':
-            return None
-        else:
-            return key
-
-
-class PublishKafka(Processor):
-    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(PublishKafka, self).__init__('PublishKafka',
-                                           properties={'Client Name': 
'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
-                                                       'Batch Size': '10', 
'Compress Codec': 'none', 'Delivery Guarantee': '1',
-                                                       'Request Timeout': '10 
sec', 'Message Timeout': '12 sec'},
-                                           auto_terminate=['success'],
-                                           schedule=schedule)
-
-
-class PublishKafkaSSL(Processor):
-    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(PublishKafkaSSL, self).__init__('PublishKafka',
-                                              properties={'Client Name': 
'LMN', 'Known Brokers': 'kafka-broker:9093',
-                                                          'Topic Name': 
'test', 'Batch Size': '10',
-                                                          'Compress Codec': 
'none', 'Delivery Guarantee': '1',
-                                                          'Request Timeout': 
'10 sec', 'Message Timeout': '12 sec',
-                                                          'Security CA': 
'/tmp/resources/certs/ca-cert',
-                                                          'Security Cert': 
'/tmp/resources/certs/client_LMN_client.pem',
-                                                          'Security Pass 
Phrase': 'abcdefgh',
-                                                          'Security Private 
Key': '/tmp/resources/certs/client_LMN_client.key',
-                                                          'Security Protocol': 
'ssl'},
-                                              auto_terminate=['success'],
-                                              schedule=schedule)
-
-class PutS3Object(Processor):
-    def __init__(self,
-                 proxy_host='',
-                 proxy_port='',
-                 proxy_username='',
-                 proxy_password=''):
-        super(PutS3Object, self).__init__('PutS3Object',
-                                          properties={
-                                            'Object Key': 'test_object_key',
-                                            'Bucket': 'test_bucket',
-                                            'Access Key': 'test_access_key',
-                                            'Secret Key': 'test_secret',
-                                            'Endpoint Override URL': 
"http://s3-server:9090";,
-                                            'Proxy Host': proxy_host,
-                                            'Proxy Port': proxy_port,
-                                            'Proxy Username': proxy_username,
-                                            'Proxy Password': proxy_password,
-                                          },
-                                          auto_terminate=['success'])
-
-class DeleteS3Object(Processor):
-    def __init__(self,
-                 proxy_host='',
-                 proxy_port='',
-                 proxy_username='',
-                 proxy_password=''):
-        super(DeleteS3Object, self).__init__('DeleteS3Object',
-                                          properties={
-                                            'Object Key': 'test_object_key',
-                                            'Bucket': 'test_bucket',
-                                            'Access Key': 'test_access_key',
-                                            'Secret Key': 'test_secret',
-                                            'Endpoint Override URL': 
"http://s3-server:9090";,
-                                            'Proxy Host': proxy_host,
-                                            'Proxy Port': proxy_port,
-                                            'Proxy Username': proxy_username,
-                                            'Proxy Password': proxy_password,
-                                          },
-                                          auto_terminate=['success'])
-
-class InputPort(Connectable):
-    def __init__(self, name=None, remote_process_group=None):
-        super(InputPort, self).__init__(name=name)
-
-        self.remote_process_group = remote_process_group
-
-
-class RemoteProcessGroup(object):
-    def __init__(self, url,
-                 name=None):
-
-        self.uuid = uuid.uuid4()
-
-        if name is None:
-            self.name = str(self.uuid)
-        else:
-            self.name = name
-
-        self.url = url
-
-
-class ControllerService(object):
-    def __init__(self, name=None, properties=None):
-
-        self.id = str(uuid.uuid4())
-
-        if name is None:
-            self.name = str(uuid.uuid4())
-            logging.info('Controller service name was not provided; using 
generated name \'%s\'', self.name)
-        else:
-            self.name = name
-
-        if properties is None:
-            properties = {}
-
-        self.properties = properties
-
-
-class SSLContextService(ControllerService):
-    def __init__(self, name=None, cert=None, key=None, ca_cert=None):
-        super(SSLContextService, self).__init__(name=name)
-
-        self.service_class = 'SSLContextService'
-
-        if cert is not None:
-            self.properties['Client Certificate'] = cert
-
-        if key is not None:
-            self.properties['Private Key'] = key
-
-        if ca_cert is not None:
-            self.properties['CA Certificate'] = ca_cert
-
-
-def minifi_flow_yaml(connectable, root=None, visited=None):
-    if visited is None:
-        visited = []
-
-    if root is None:
-        res = {
-            'Flow Controller': {
-                'name': 'MiNiFi Flow'
-            },
-            'Processors': [],
-            'Connections': [],
-            'Remote Processing Groups': [],
-            'Controller Services': []
-        }
-    else:
-        res = root
-
-    visited.append(connectable)
-
-    if hasattr(connectable, 'name'):
-        connectable_name = connectable.name
-    else:
-        connectable_name = str(connectable.uuid)
-
-    if isinstance(connectable, InputPort):
-        group = connectable.remote_process_group
-        res_group = None
-
-        for res_group_candidate in res['Remote Processing Groups']:
-            assert isinstance(res_group_candidate, dict)
-            if res_group_candidate['id'] == str(group.uuid):
-                res_group = res_group_candidate
-
-        if res_group is None:
-            res_group = {
-                'name': group.name,
-                'id': str(group.uuid),
-                'url': group.url,
-                'timeout': '30 sec',
-                'yield period': '3 sec',
-                'Input Ports': []
-            }
-
-            res['Remote Processing Groups'].append(res_group)
-
-        res_group['Input Ports'].append({
-            'id': str(connectable.uuid),
-            'name': connectable.name,
-            'max concurrent tasks': 1,
-            'Properties': {}
-        })
-
-    if isinstance(connectable, Processor):
-        res['Processors'].append({
-            'name': connectable_name,
-            'id': str(connectable.uuid),
-            'class': 'org.apache.nifi.processors.standard.' + 
connectable.clazz,
-            'scheduling strategy': connectable.schedule['scheduling strategy'],
-            'scheduling period': connectable.schedule['scheduling period'],
-            'penalization period': connectable.schedule['penalization period'],
-            'yield period': connectable.schedule['yield period'],
-            'run duration nanos': connectable.schedule['run duration nanos'],
-            'Properties': connectable.properties,
-            'auto-terminated relationships list': connectable.auto_terminate
-        })
-
-        for svc in connectable.controller_services:
-            if svc in visited:
-                continue
-
-            visited.append(svc)
-            res['Controller Services'].append({
-                'name': svc.name,
-                'id': svc.id,
-                'class': svc.service_class,
-                'Properties': svc.properties
-            })
-
-    for conn_name in connectable.connections:
-        conn_procs = connectable.connections[conn_name]
-
-        if isinstance(conn_procs, list):
-            for proc in conn_procs:
-                res['Connections'].append({
-                    'name': str(uuid.uuid4()),
-                    'source id': str(connectable.uuid),
-                    'source relationship name': conn_name,
-                    'destination id': str(proc.uuid),
-                    'drop empty': ("true" if proc.drop_empty_flowfiles else 
"false")
-                })
-                if proc not in visited:
-                    minifi_flow_yaml(proc, res, visited)
-        else:
-            res['Connections'].append({
-                'name': str(uuid.uuid4()),
-                'source id': str(connectable.uuid),
-                'source relationship name': conn_name,
-                'destination id': str(conn_procs.uuid)
-            })
-            if conn_procs not in visited:
-                minifi_flow_yaml(conn_procs, res, visited)
-
-    if root is None:
-        return yaml.dump(res, default_flow_style=False)
-
-
-def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None):
-    if visited is None:
-        visited = []
-
-    position = Element('position')
-    position.set('x', '0.0')
-    position.set('y', '0.0')
-
-    comment = Element('comment')
-    styles = Element('styles')
-    bend_points = Element('bendPoints')
-    label_index = Element('labelIndex')
-    label_index.text = '1'
-    z_index = Element('zIndex')
-    z_index.text = '0'
-
-    if root is None:
-        res = Element('flowController')
-        max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount')
-        max_timer_driven_thread_count.text = '10'
-        res.append(max_timer_driven_thread_count)
-        max_event_driven_thread_count = Element('maxEventDrivenThreadCount')
-        max_event_driven_thread_count.text = '5'
-        res.append(max_event_driven_thread_count)
-        root_group = Element('rootGroup')
-        root_group_id = Element('id')
-        root_group_id_text = str(uuid.uuid4())
-        root_group_id.text = root_group_id_text
-        root_group.append(root_group_id)
-        root_group_name = Element('name')
-        root_group_name.text = root_group_id_text
-        root_group.append(root_group_name)
-        res.append(root_group)
-        root_group.append(position)
-        root_group.append(comment)
-        res.append(Element('controllerServices'))
-        res.append(Element('reportingTasks'))
-        res.set('encoding-version', '1.2')
-    else:
-        res = root
-
-    visited.append(connectable)
-
-    if hasattr(connectable, 'name'):
-        connectable_name_text = connectable.name
-    else:
-        connectable_name_text = str(connectable.uuid)
-
-    if isinstance(connectable, InputPort):
-        input_port = Element('inputPort')
-
-        input_port_id = Element('id')
-        input_port_id.text = str(connectable.uuid)
-        input_port.append(input_port_id)
-
-        input_port_name = Element('name')
-        input_port_name.text = connectable_name_text
-        input_port.append(input_port_name)
-
-        input_port.append(position)
-        input_port.append(comment)
-
-        input_port_scheduled_state = Element('scheduledState')
-        input_port_scheduled_state.text = 'RUNNING'
-        input_port.append(input_port_scheduled_state)
-
-        input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
-        input_port_max_concurrent_tasks.text = '1'
-        input_port.append(input_port_max_concurrent_tasks)
-        next( res.iterfind('rootGroup') ).append(input_port)
-
-    if isinstance(connectable, Processor):
-        conn_destination = Element('processor')
-
-        proc_id = Element('id')
-        proc_id.text = str(connectable.uuid)
-        conn_destination.append(proc_id)
-
-        proc_name = Element('name')
-        proc_name.text = connectable_name_text
-        conn_destination.append(proc_name)
-
-        conn_destination.append(position)
-        conn_destination.append(styles)
-        conn_destination.append(comment)
-
-        proc_class = Element('class')
-        proc_class.text = 'org.apache.nifi.processors.standard.' + 
connectable.clazz
-        conn_destination.append(proc_class)
-
-        proc_bundle = Element('bundle')
-        proc_bundle_group = Element('group')
-        proc_bundle_group.text = 'org.apache.nifi'
-        proc_bundle.append(proc_bundle_group)
-        proc_bundle_artifact = Element('artifact')
-        proc_bundle_artifact.text = 'nifi-standard-nar'
-        proc_bundle.append(proc_bundle_artifact)
-        proc_bundle_version = Element('version')
-        proc_bundle_version.text = nifi_version
-        proc_bundle.append(proc_bundle_version)
-        conn_destination.append(proc_bundle)
-
-        proc_max_concurrent_tasks = Element('maxConcurrentTasks')
-        proc_max_concurrent_tasks.text = '1'
-        conn_destination.append(proc_max_concurrent_tasks)
-
-        proc_scheduling_period = Element('schedulingPeriod')
-        proc_scheduling_period.text = connectable.schedule['scheduling period']
-        conn_destination.append(proc_scheduling_period)
-
-        proc_penalization_period = Element('penalizationPeriod')
-        proc_penalization_period.text = connectable.schedule['penalization 
period']
-        conn_destination.append(proc_penalization_period)
-
-        proc_yield_period = Element('yieldPeriod')
-        proc_yield_period.text = connectable.schedule['yield period']
-        conn_destination.append(proc_yield_period)
-
-        proc_bulletin_level = Element('bulletinLevel')
-        proc_bulletin_level.text = 'WARN'
-        conn_destination.append(proc_bulletin_level)
-
-        proc_loss_tolerant = Element('lossTolerant')
-        proc_loss_tolerant.text = 'false'
-        conn_destination.append(proc_loss_tolerant)
-
-        proc_scheduled_state = Element('scheduledState')
-        proc_scheduled_state.text = 'RUNNING'
-        conn_destination.append(proc_scheduled_state)
-
-        proc_scheduling_strategy = Element('schedulingStrategy')
-        proc_scheduling_strategy.text = connectable.schedule['scheduling 
strategy']
-        conn_destination.append(proc_scheduling_strategy)
-
-        proc_execution_node = Element('executionNode')
-        proc_execution_node.text = 'ALL'
-        conn_destination.append(proc_execution_node)
-
-        proc_run_duration_nanos = Element('runDurationNanos')
-        proc_run_duration_nanos.text = str(connectable.schedule['run duration 
nanos'])
-        conn_destination.append(proc_run_duration_nanos)
-
-        for property_key, property_value in connectable.properties.items():
-            proc_property = Element('property')
-            proc_property_name = Element('name')
-            proc_property_name.text = 
connectable.nifi_property_key(property_key)
-            if not proc_property_name.text:
-                continue
-            proc_property.append(proc_property_name)
-            proc_property_value = Element('value')
-            proc_property_value.text = property_value
-            proc_property.append(proc_property_value)
-            conn_destination.append(proc_property)
-
-        for auto_terminate_rel in connectable.auto_terminate:
-            proc_auto_terminated_relationship = 
Element('autoTerminatedRelationship')
-            proc_auto_terminated_relationship.text = auto_terminate_rel
-            conn_destination.append(proc_auto_terminated_relationship)
-        next( res.iterfind('rootGroup') ).append(conn_destination)
-        """ res.iterfind('rootGroup').next().append(conn_destination) """
-
-        for svc in connectable.controller_services:
-            if svc in visited:
-                continue
-
-            visited.append(svc)
-            controller_service = Element('controllerService')
-
-            controller_service_id = Element('id')
-            controller_service_id.text = str(svc.id)
-            controller_service.append(controller_service_id)
-
-            controller_service_name = Element('name')
-            controller_service_name.text = svc.name
-            controller_service.append(controller_service_name)
-
-            controller_service.append(comment)
-
-            controller_service_class = Element('class')
-            controller_service_class.text = svc.service_class,
-            controller_service.append(controller_service_class)
-
-            controller_service_bundle = Element('bundle')
-            controller_service_bundle_group = Element('group')
-            controller_service_bundle_group.text = svc.group
-            controller_service_bundle.append(controller_service_bundle_group)
-            controller_service_bundle_artifact = Element('artifact')
-            controller_service_bundle_artifact.text = svc.artifact
-            
controller_service_bundle.append(controller_service_bundle_artifact)
-            controller_service_bundle_version = Element('version')
-            controller_service_bundle_version.text = nifi_version
-            controller_service_bundle.append(controller_service_bundle_version)
-            controller_service.append(controller_service_bundle)
-
-            controller_enabled = Element('enabled')
-            controller_enabled.text = 'true',
-            controller_service.append(controller_enabled)
-
-            for property_key, property_value in svc.properties:
-                controller_service_property = Element('property')
-                controller_service_property_name = Element('name')
-                controller_service_property_name.text = property_key
-                
controller_service_property.append(controller_service_property_name)
-                controller_service_property_value = Element('value')
-                controller_service_property_value.text = property_value
-                
controller_service_property.append(controller_service_property_value)
-                controller_service.append(controller_service_property)
-            next( res.iterfind('rootGroup') ).append(controller_service)
-            """ res.iterfind('rootGroup').next().append(controller_service)"""
-
-    for conn_name in connectable.connections:
-        conn_destinations = connectable.connections[conn_name]
-
-        if isinstance(conn_destinations, list):
-            for conn_destination in conn_destinations:
-                connection = nifi_flow_xml_connection(res,
-                                                      bend_points,
-                                                      conn_name,
-                                                      connectable,
-                                                      label_index,
-                                                      conn_destination,
-                                                      z_index)
-                next( res.iterfind('rootGroup') ).append(connection)
-                """ res.iterfind('rootGroup').next().append(connection) """
-
-                if conn_destination not in visited:
-                    nifi_flow_xml(conn_destination, nifi_version, res, visited)
-        else:
-            connection = nifi_flow_xml_connection(res,
-                                                  bend_points,
-                                                  conn_name,
-                                                  connectable,
-                                                  label_index,
-                                                  conn_destinations,
-                                                  z_index)
-            next( res.iterfind('rootGroup') ).append(connection)
-            """ res.iterfind('rootGroup').next().append(connection) """
-
-            if conn_destinations not in visited:
-                nifi_flow_xml(conn_destinations, nifi_version, res, visited)
-
-    if root is None:
-        return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
-                + "\n"
-                + elementTree.tostring(res, encoding='utf-8').decode('utf-8'))
-
-
-def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, 
label_index, destination, z_index):
-    connection = Element('connection')
-
-    connection_id = Element('id')
-    connection_id.text = str(uuid.uuid4())
-    connection.append(connection_id)
-
-    connection_name = Element('name')
-    connection.append(connection_name)
-
-    connection.append(bend_points)
-    connection.append(label_index)
-    connection.append(z_index)
-
-    connection_source_id = Element('sourceId')
-    connection_source_id.text = str(connectable.uuid)
-    connection.append(connection_source_id)
-
-    connection_source_group_id = Element('sourceGroupId')
-    connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text
-    """connection_source_group_id.text = 
res.iterfind('rootGroup/id').next().text"""
-    connection.append(connection_source_group_id)
-
-    connection_source_type = Element('sourceType')
-    if isinstance(connectable, Processor):
-        connection_source_type.text = 'PROCESSOR'
-    elif isinstance(connectable, InputPort):
-        connection_source_type.text = 'INPUT_PORT'
-    else:
-        raise Exception('Unexpected source type: %s' % type(connectable))
-    connection.append(connection_source_type)
-
-    connection_destination_id = Element('destinationId')
-    connection_destination_id.text = str(destination.uuid)
-    connection.append(connection_destination_id)
-
-    connection_destination_group_id = Element('destinationGroupId')
-    connection_destination_group_id.text = 
next(res.iterfind('rootGroup/id')).text
-    """ connection_destination_group_id.text = 
res.iterfind('rootGroup/id').next().text """
-    connection.append(connection_destination_group_id)
-
-    connection_destination_type = Element('destinationType')
-    if isinstance(destination, Processor):
-        connection_destination_type.text = 'PROCESSOR'
-    elif isinstance(destination, InputPort):
-        connection_destination_type.text = 'INPUT_PORT'
-    else:
-        raise Exception('Unexpected destination type: %s' % type(destination))
-    connection.append(connection_destination_type)
-
-    connection_relationship = Element('relationship')
-    if not isinstance(connectable, InputPort):
-        connection_relationship.text = conn_name
-    connection.append(connection_relationship)
-
-    connection_max_work_queue_size = Element('maxWorkQueueSize')
-    connection_max_work_queue_size.text = '10000'
-    connection.append(connection_max_work_queue_size)
-
-    connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
-    connection_max_work_queue_data_size.text = '1 GB'
-    connection.append(connection_max_work_queue_data_size)
-
-    connection_flow_file_expiration = Element('flowFileExpiration')
-    connection_flow_file_expiration.text = '0 sec'
-    connection.append(connection_flow_file_expiration)
-
-    return connection
diff --git a/docker/test/integration/minifi/core/Cluster.py 
b/docker/test/integration/minifi/core/Cluster.py
new file mode 100644
index 0000000..0818f37
--- /dev/null
+++ b/docker/test/integration/minifi/core/Cluster.py
@@ -0,0 +1,22 @@
+class Cluster(object):
+    """
+    Base Cluster class. This is intended to be a generic interface
+    to different types of clusters. Clusters could be Kubernetes clusters,
+    Docker swarms, or cloud compute/container services.
+    """
+
+    def deploy_flow(self, flow, name=None, vols=None):
+        """
+        Deploys a flow to the cluster.
+        """
+
+    def __enter__(self):
+        """
+        Allocate ephemeral cluster resources.
+        """
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral cluster resources.
+        """
diff --git a/docker/test/integration/minifi/core/Connectable.py 
b/docker/test/integration/minifi/core/Connectable.py
new file mode 100644
index 0000000..c3472a7
--- /dev/null
+++ b/docker/test/integration/minifi/core/Connectable.py
@@ -0,0 +1,77 @@
+import uuid
+from copy import copy
+
+class Connectable(object):
+    def __init__(self,
+                 name=None,
+                 auto_terminate=None):
+
+        self.uuid = uuid.uuid4()
+
+        if name is None:
+            self.name = str(self.uuid)
+        else:
+            self.name = name
+
+        if auto_terminate is None:
+            self.auto_terminate = []
+        else:
+            self.auto_terminate = auto_terminate
+
+        self.connections = {}
+        self.out_proc = self
+
+        self.drop_empty_flowfiles = False
+
+    def connect(self, connections):
+        for rel in connections:
+
+            # Ensure that rel is not auto-terminated
+            if rel in self.auto_terminate:
+                del self.auto_terminate[self.auto_terminate.index(rel)]
+
+            # Add to set of output connections for this rel
+            if rel not in self.connections:
+                self.connections[rel] = []
+            self.connections[rel].append(connections[rel])
+
+        return self
+
+    def __rshift__(self, other):
+        """
+        Right shift operator to support flow DSL, for example:
+
+            GetFile('/input') >> LogAttribute() >> PutFile('/output')
+
+        """
+
+        connected = copy(self)
+        connected.connections = copy(self.connections)
+
+        if self.out_proc is self:
+            connected.out_proc = connected
+        else:
+            connected.out_proc = copy(connected.out_proc)
+
+        if isinstance(other, tuple):
+            if isinstance(other[0], tuple):
+                for rel_tuple in other:
+                    rel = {rel_tuple[0]: rel_tuple[1]}
+                    connected.out_proc.connect(rel)
+            else:
+                rel = {other[0]: other[1]}
+                connected.out_proc.connect(rel)
+        else:
+            connected.out_proc.connect({'success': other})
+            connected.out_proc = other
+
+        return connected
+
+    def __invert__(self):
+        """
+        Invert operation to set empty file filtering on incoming connections
+        GetFile('/input') >> ~LogAttribute()
+        """
+        self.drop_empty_flowfiles = True
+
+        return self
diff --git a/docker/test/integration/minifi/core/ControllerService.py 
b/docker/test/integration/minifi/core/ControllerService.py
new file mode 100644
index 0000000..d8b4e17
--- /dev/null
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -0,0 +1,17 @@
+import uuid
+
+class ControllerService(object):
+    def __init__(self, name=None, properties=None):
+
+        self.id = str(uuid.uuid4())
+
+        if name is None:
+            self.name = str(uuid.uuid4())
+            logging.info('Controller service name was not provided; using 
generated name \'%s\'', self.name)
+        else:
+            self.name = name
+
+        if properties is None:
+            properties = {}
+
+        self.properties = properties
diff --git a/docker/test/integration/minifi/test/__init__.py 
b/docker/test/integration/minifi/core/DockerTestCluster.py
similarity index 66%
rename from docker/test/integration/minifi/test/__init__.py
rename to docker/test/integration/minifi/core/DockerTestCluster.py
index bf64472..1742f83 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -1,45 +1,20 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+import json
 import logging
+import os
 import shutil
-import uuid
-import tarfile
 import subprocess
 import sys
 import time
-import subprocess
-import json
-from io import BytesIO
-from threading import Event
+import uuid
 
-import os
-from os import listdir
 from os.path import join
+from threading import Event
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
 
-from minifi import SingleNodeDockerCluster
-
-logging.basicConfig(level=logging.DEBUG)
-
-def put_file_contents(contents, file_abs_path):
-    logging.info('Writing %d bytes of content to file: %s', len(contents), 
file_abs_path)
-    with open(file_abs_path, 'wb') as test_input_file:
-        test_input_file.write(contents)
-
+from .OutputEventHandler import OutputEventHandler
+from .SingleNodeDockerCluster import SingleNodeDockerCluster
+from ..validators.FileOutputValidator import FileOutputValidator
 
 class DockerTestCluster(SingleNodeDockerCluster):
     def __init__(self, output_validator):
@@ -134,7 +109,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
         self.test_data = contents
         file_name = str(uuid.uuid4())
         file_abs_path = join(self.tmp_test_input_dir, file_name)
-        put_file_contents(contents.encode('utf-8'), file_abs_path)
+        self.put_file_contents(contents.encode('utf-8'), file_abs_path)
 
     def put_test_resource(self, file_name, contents):
         """
@@ -143,7 +118,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
         """
 
         file_abs_path = join(self.tmp_test_resources_dir, file_name)
-        put_file_contents(contents, file_abs_path)
+        self.put_file_contents(contents, file_abs_path)
 
     def restart_observer_if_needed(self):
         if self.observer.is_alive():
@@ -238,6 +213,12 @@ class DockerTestCluster(SingleNodeDockerCluster):
                 check_count += 1
                 time.sleep(1)
         return False
+    
+    def put_file_contents(self, contents, file_abs_path):
+        logging.info('Writing %d bytes of content to file: %s', len(contents), 
file_abs_path)
+        with open(file_abs_path, 'wb') as test_input_file:
+            test_input_file.write(contents)
+
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         """
@@ -252,132 +233,3 @@ class DockerTestCluster(SingleNodeDockerCluster):
         shutil.rmtree(self.tmp_test_resources_dir)
 
         super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
-
-
-class OutputEventHandler(FileSystemEventHandler):
-    def __init__(self, validator, done_event):
-        self.validator = validator
-        self.done_event = done_event
-
-    def on_created(self, event):
-        logging.info('Output file created: ' + event.src_path)
-        self.check(event)
-
-    def on_modified(self, event):
-        logging.info('Output file modified: ' + event.src_path)
-        self.check(event)
-
-    def check(self, event):
-        if self.validator.validate():
-            logging.info('Output file is valid')
-            self.done_event.set()
-        else:
-            logging.info('Output file is invalid')
-
-
-class OutputValidator(object):
-    """
-    Base output validator class. Validators must implement
-    method validate, which returns a boolean.
-    """
-
-    def validate(self):
-        """
-        Return True if output is valid; False otherwise.
-        """
-        raise NotImplementedError("validate function needs to be implemented 
for validators")
-
-
-
-class FileOutputValidator(OutputValidator):
-    def set_output_dir(self, output_dir):
-        self.output_dir = output_dir
-
-    def validate(self, dir=''):
-        pass
-
-class SingleFileOutputValidator(FileOutputValidator):
-    """
-    Validates the content of a single file in the given directory.
-    """
-
-    def __init__(self, expected_content, subdir=''):
-        self.valid = False
-        self.expected_content = expected_content
-        self.subdir = subdir
-
-    def validate(self):
-        self.valid = False
-        full_dir = os.path.join(self.output_dir, self.subdir)
-        logging.info("Output folder: %s", full_dir)
-
-        if not os.path.isdir(full_dir):
-            return self.valid
-
-        listing = listdir(full_dir)
-        if listing:
-            for l in listing:
-                logging.info("name:: %s", l)
-            out_file_name = listing[0]
-            full_path = join(full_dir, out_file_name)
-            if not os.path.isfile(full_path):
-                return self.valid
-
-            with open(full_path, 'r') as out_file:
-                contents = out_file.read()
-                logging.info("dir %s -- name %s", full_dir, out_file_name)
-                logging.info("expected %s -- content %s", 
self.expected_content, contents)
-
-                if self.expected_content in contents:
-                    self.valid = True
-
-        return self.valid
-
-
-class EmptyFilesOutPutValidator(FileOutputValidator):
-    """
-    Validates if all the files in the target directory are empty and at least 
one exists
-    """
-    def __init__(self):
-        self.valid = False
-
-    def validate(self, dir=''):
-
-        if self.valid:
-            return True
-
-        full_dir = self.output_dir + dir
-        logging.info("Output folder: %s", full_dir)
-        listing = listdir(full_dir)
-        if listing:
-            self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 
for x in listing)
-
-        return self.valid
-
-class NoFileOutPutValidator(FileOutputValidator):
-    """
-    Validates if no flowfiles were transferred
-    """
-    def __init__(self):
-        self.valid = False
-
-    def validate(self, dir=''):
-
-        if self.valid:
-            return True
-
-        full_dir = self.output_dir + dir
-        logging.info("Output folder: %s", full_dir)
-        listing = listdir(full_dir)
-
-        self.valid = not bool(listing)
-
-        return self.valid
-
-
-class SegfaultValidator(OutputValidator):
-    """
-    Validate that a file was received.
-    """
-    def validate(self):
-        return True
diff --git a/docker/test/integration/minifi/core/InputPort.py 
b/docker/test/integration/minifi/core/InputPort.py
new file mode 100644
index 0000000..1f00b7f
--- /dev/null
+++ b/docker/test/integration/minifi/core/InputPort.py
@@ -0,0 +1,7 @@
+from .Connectable import Connectable
+
+class InputPort(Connectable):
+    def __init__(self, name=None, remote_process_group=None):
+        super(InputPort, self).__init__(name=name)
+
+        self.remote_process_group = remote_process_group
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py 
b/docker/test/integration/minifi/core/OutputEventHandler.py
new file mode 100644
index 0000000..3d4c984
--- /dev/null
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -0,0 +1,24 @@
+import logging
+
+from watchdog.events import FileSystemEventHandler
+
+class OutputEventHandler(FileSystemEventHandler):
+    def __init__(self, validator, done_event):
+        self.validator = validator
+        self.done_event = done_event
+
+    def on_created(self, event):
+        logging.info('Output file created: ' + event.src_path)
+        self.check(event)
+
+    def on_modified(self, event):
+        logging.info('Output file modified: ' + event.src_path)
+        self.check(event)
+
+    def check(self, event):
+        if self.validator.validate():
+            logging.info('Output file is valid')
+            self.done_event.set()
+        else:
+            logging.info('Output file is invalid')
+
diff --git a/docker/test/integration/minifi/core/Processor.py 
b/docker/test/integration/minifi/core/Processor.py
new file mode 100644
index 0000000..e7e41e5
--- /dev/null
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -0,0 +1,45 @@
+from .Connectable import Connectable
+
+class Processor(Connectable):
+    def __init__(self,
+                 clazz,
+                 properties=None,
+                 schedule=None,
+                 name=None,
+                 controller_services=None,
+                 auto_terminate=None):
+
+        super(Processor, self).__init__(name=name,
+                                        auto_terminate=auto_terminate)
+
+        if controller_services is None:
+            controller_services = []
+
+        if schedule is None:
+            schedule = {}
+
+        if properties is None:
+            properties = {}
+
+        if name is None:
+            pass
+
+        self.clazz = clazz
+        self.properties = properties
+        self.controller_services = controller_services
+
+        self.schedule = {
+            'scheduling strategy': 'TIMER_DRIVEN',
+            'scheduling period': '1 sec',
+            'penalization period': '30 sec',
+            'yield period': '1 sec',
+            'run duration nanos': 0
+        }
+        self.schedule.update(schedule)
+
+    def nifi_property_key(self, key):
+        """
+        Returns the Apache NiFi-equivalent property key for the given key. 
This is often, but not always, the same as
+        the internal key.
+        """
+        return key
diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py 
b/docker/test/integration/minifi/core/RemoteProcessGroup.py
new file mode 100644
index 0000000..6901fb6
--- /dev/null
+++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py
@@ -0,0 +1,13 @@
+import uuid
+
+class RemoteProcessGroup(object):
+    def __init__(self, url,
+                 name=None):
+        self.uuid = uuid.uuid4()
+
+        if name is None:
+            self.name = str(self.uuid)
+        else:
+            self.name = name
+
+        self.url = url
diff --git a/docker/test/integration/minifi/core/SSLContextService.py 
b/docker/test/integration/minifi/core/SSLContextService.py
new file mode 100644
index 0000000..5866508
--- /dev/null
+++ b/docker/test/integration/minifi/core/SSLContextService.py
@@ -0,0 +1,16 @@
+from .ControllerService import ControllerService
+
+class SSLContextService(ControllerService):
+    def __init__(self, name=None, cert=None, key=None, ca_cert=None):
+        super(SSLContextService, self).__init__(name=name)
+
+        self.service_class = 'SSLContextService'
+
+        if cert is not None:
+            self.properties['Client Certificate'] = cert
+
+        if key is not None:
+            self.properties['Private Key'] = key
+
+        if ca_cert is not None:
+            self.properties['CA Certificate'] = ca_cert
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py 
b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
new file mode 100644
index 0000000..f48c288
--- /dev/null
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -0,0 +1,319 @@
+import gzip
+import docker
+import logging
+import os
+import tarfile
+import uuid
+
+from collections import OrderedDict
+from io import BytesIO
+from textwrap import dedent
+
+from .Cluster import Cluster
+from ..flow_serialization.Minifi_flow_yaml_serializer import 
Minifi_flow_yaml_serializer
+from ..flow_serialization.Nifi_flow_xml_serializer import 
Nifi_flow_xml_serializer
+
+class SingleNodeDockerCluster(Cluster):
+    """
+    A "cluster" which consists of a single docker node. Useful for
+    testing or use-cases which do not span multiple compute nodes.
+    """
+
+    def __init__(self):
+        self.minifi_version = os.environ['MINIFI_VERSION']
+        self.nifi_version = '1.7.0'
+        self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
+        self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
+        self.network = None
+        self.containers = OrderedDict()
+        self.images = []
+        self.tmp_files = []
+
+        # Get docker client
+        self.client = docker.from_env()
+
+    def deploy_flow(self,
+                    flow,
+                    name=None,
+                    vols=None,
+                    engine='minifi-cpp'):
+        """
+        Compiles the flow to a valid config file and overlays it into a new 
image.
+        """
+
+        if vols is None:
+            vols = {}
+
+        logging.info('Deploying %s flow...%s', engine,name)
+
+        if name is None:
+            name = engine + '-' + str(uuid.uuid4())
+            logging.info('Flow name was not provided; using generated name 
\'%s\'', name)
+
+        # Create network if necessary
+        if self.network is None:
+            net_name = 'nifi-' + str(uuid.uuid4())
+            logging.info('Creating network: %s', net_name)
+            self.network = self.client.networks.create(net_name)
+
+        if engine == 'nifi':
+            self.deploy_nifi_flow(flow, name, vols)
+        elif engine == 'minifi-cpp':
+            self.deploy_minifi_cpp_flow(flow, name, vols)
+        elif engine == 'kafka-broker':
+            self.deploy_kafka_broker(name)
+        elif engine == 'http-proxy':
+            self.deploy_http_proxy()
+        elif engine == 's3-server':
+            self.deploy_s3_server()
+        else:
+            raise Exception('invalid flow engine: \'%s\'' % engine)
+
+    def deploy_minifi_cpp_flow(self, flow, name, vols):
+
+        # Build configured image
+        dockerfile = dedent("""FROM {base_image}
+                USER root
+                ADD config.yml {minifi_root}/conf/config.yml
+                RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
+                USER minificpp
+                """.format(name=name,hostname=name,
+                           base_image='apacheminificpp:' + self.minifi_version,
+                           minifi_root=self.minifi_root))
+
+        serializer = Minifi_flow_yaml_serializer()
+        test_flow_yaml = serializer.serialize(flow)
+        logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
+
+        conf_file_buffer = BytesIO()
+
+        try:
+            conf_file_buffer.write(test_flow_yaml.encode('utf-8'))
+            conf_file_len = conf_file_buffer.tell()
+            conf_file_buffer.seek(0)
+
+            context_files = [
+                {
+                    'name': 'config.yml',
+                    'size': conf_file_len,
+                    'file_obj': conf_file_buffer
+                }
+            ]
+
+            configured_image = self.build_image(dockerfile, context_files)
+
+        finally:
+            conf_file_buffer.close()
+
+        logging.info('Creating and running docker container for flow...')
+
+        container = self.client.containers.run(
+                configured_image[0],
+                detach=True,
+                name=name,
+                network=self.network.name,
+                volumes=vols)
+
+        logging.info('Started container \'%s\'', container.name)
+
+        self.containers[container.name] = container
+
+    def deploy_nifi_flow(self, flow, name, vols):
+        dockerfile = dedent(r"""FROM {base_image}
+                USER root
+                ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
+                RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
+                RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' 
{nifi_root}/conf/nifi.properties
+                RUN sed -i -e 
's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' 
{nifi_root}/conf/nifi.properties
+                USER nifi
+                """.format(name=name,
+                           base_image='apache/nifi:' + self.nifi_version,
+                           nifi_root=self.nifi_root))
+
+        serializer = Nifi_flow_xml_serializer()
+        test_flow_xml = serializer.serialize(flow, self.nifi_version)
+        logging.info('Using generated flow config xml:\n%s', test_flow_xml)
+
+        conf_file_buffer = BytesIO()
+
+        try:
+            with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as 
conf_gz_file_buffer:
+                conf_gz_file_buffer.write(test_flow_xml.encode())
+            conf_file_len = conf_file_buffer.tell()
+            conf_file_buffer.seek(0)
+
+            context_files = [
+                {
+                    'name': 'flow.xml.gz',
+                    'size': conf_file_len,
+                    'file_obj': conf_file_buffer
+                }
+            ]
+
+            configured_image = self.build_image(dockerfile, context_files)
+
+        finally:
+            conf_file_buffer.close()
+
+        logging.info('Creating and running docker container for flow...')
+
+        container = self.client.containers.run(
+                configured_image[0],
+                detach=True,
+                name=name,
+                hostname=name,
+                network=self.network.name,
+                volumes=vols)
+
+        logging.info('Started container \'%s\'', container.name)
+
+        self.containers[container.name] = container
+
+    def deploy_kafka_broker(self, name):
+        logging.info('Creating and running docker containers for kafka 
broker...')
+        zookeeper = self.client.containers.run(
+                    self.client.images.pull("wurstmeister/zookeeper:latest"),
+                    detach=True,
+                    name='zookeeper',
+                    network=self.network.name,
+                    ports={'2181/tcp': 2181},
+                    )
+        self.containers[zookeeper.name] = zookeeper
+
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on 
DockerVerify.sh
+        broker_image = self.build_image_by_path(test_dir + 
"/resources/kafka_broker", 'minifi-kafka')
+        broker = self.client.containers.run(
+                    broker_image[0],
+                    detach=True,
+                    name='kafka-broker',
+                    network=self.network.name,
+                    ports={'9092/tcp': 9092},
+                    
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093",
 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
+                    )
+        self.containers[broker.name] = broker
+
+        dockerfile = dedent("""FROM {base_image}
+                USER root
+                CMD $KAFKA_HOME/bin/kafka-console-consumer.sh 
--bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
+                """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
+        configured_image = self.build_image(dockerfile, [])
+        consumer = self.client.containers.run(
+                    configured_image[0],
+                    detach=True,
+                    name='kafka-consumer',
+                    network=self.network.name,
+                    )
+        self.containers[consumer.name] = consumer
+
+    def deploy_http_proxy(self):
+        logging.info('Creating and running http-proxy docker container...')
+        dockerfile = dedent("""FROM {base_image}
+                RUN apt update && apt install -y apache2-utils
+                RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} 
{proxy_password}
+                RUN echo 'auth_param basic program 
/usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users'  > 
/etc/squid/squid.conf && \
+                    echo 'auth_param basic realm proxy' >> 
/etc/squid/squid.conf && \
+                    echo 'acl authenticated proxy_auth REQUIRED' >> 
/etc/squid/squid.conf && \
+                    echo 'http_access allow authenticated' >> 
/etc/squid/squid.conf && \
+                    echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
+                ENTRYPOINT ["/sbin/entrypoint.sh"]
+                """.format(base_image='sameersbn/squid:3.5.27-2', 
proxy_username='admin', proxy_password='test101', proxy_port='3128'))
+        configured_image = self.build_image(dockerfile, [])
+        consumer = self.client.containers.run(
+                    configured_image[0],
+                    detach=True,
+                    name='http-proxy',
+                    network=self.network.name,
+                    ports={'3128/tcp': 3128},
+                    )
+        self.containers[consumer.name] = consumer
+
+    def deploy_s3_server(self):
+        consumer = self.client.containers.run(
+                    "adobe/s3mock:2.1.28",
+                    detach=True,
+                    name='s3-server',
+                    network=self.network.name,
+                    ports={'9090/tcp': 9090, '9191/tcp': 9191},
+                    environment=["initialBuckets=test_bucket"],
+                    )
+        self.containers[consumer.name] = consumer
+
+    def build_image(self, dockerfile, context_files):
+        conf_dockerfile_buffer = BytesIO()
+        docker_context_buffer = BytesIO()
+
+        try:
+            # Overlay conf onto base nifi image
+            conf_dockerfile_buffer.write(dockerfile.encode())
+            conf_dockerfile_buffer.seek(0)
+
+            with tarfile.open(mode='w', fileobj=docker_context_buffer) as 
docker_context:
+                dockerfile_info = tarfile.TarInfo('Dockerfile')
+                dockerfile_info.size = 
conf_dockerfile_buffer.getbuffer().nbytes
+                docker_context.addfile(dockerfile_info,
+                                       fileobj=conf_dockerfile_buffer)
+
+                for context_file in context_files:
+                    file_info = tarfile.TarInfo(context_file['name'])
+                    file_info.size = context_file['size']
+                    docker_context.addfile(file_info,
+                                           fileobj=context_file['file_obj'])
+            docker_context_buffer.seek(0)
+
+            logging.info('Creating configured image...')
+            configured_image = 
self.client.images.build(fileobj=docker_context_buffer,
+                                                        custom_context=True,
+                                                        rm=True,
+                                                        forcerm=True)
+            logging.info('Created image with id: %s', configured_image[0].id)
+            self.images.append(configured_image)
+
+        finally:
+            conf_dockerfile_buffer.close()
+            docker_context_buffer.close()
+
+        return configured_image
+
+    def build_image_by_path(self, dir, name=None):
+        try:
+            logging.info('Creating configured image...')
+            configured_image = self.client.images.build(path=dir,
+                                                        tag=name,
+                                                        rm=True,
+                                                        forcerm=True)
+            logging.info('Created image with id: %s', configured_image[0].id)
+            self.images.append(configured_image)
+            return configured_image
+        except Exception as e:
+            logging.info(e)
+            raise
+
+    def __enter__(self):
+        """
+        Allocate ephemeral cluster resources.
+        """
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """
+        Clean up ephemeral cluster resources
+        """
+
+        # Clean up containers
+        for container in self.containers.values():
+            logging.info('Cleaning up container: %s', container.name)
+            container.remove(v=True, force=True)
+
+        # Clean up images
+        for image in reversed(self.images):
+            logging.info('Cleaning up image: %s', image[0].id)
+            self.client.images.remove(image[0].id, force=True)
+
+        # Clean up network
+        if self.network is not None:
+            logging.info('Cleaning up network network: %s', self.network.name)
+            self.network.remove()
+
+        # Clean up tmp files
+        for tmp_file in self.tmp_files:
+            os.remove(tmp_file)
diff --git a/docker/test/integration/minifi/core/__init__.py 
b/docker/test/integration/minifi/core/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
new file mode 100644
index 0000000..c7a9e0d
--- /dev/null
+++ 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -0,0 +1,111 @@
+import uuid
+import yaml
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+
+class Minifi_flow_yaml_serializer:
+    def serialize(self, connectable, root=None, visited=None):
+        if visited is None:
+            visited = []
+
+        if root is None:
+            res = {
+                'Flow Controller': {
+                    'name': 'MiNiFi Flow'
+                },
+                'Processors': [],
+                'Connections': [],
+                'Remote Processing Groups': [],
+                'Controller Services': []
+            }
+        else:
+            res = root
+
+        visited.append(connectable)
+
+        if hasattr(connectable, 'name'):
+            connectable_name = connectable.name
+        else:
+            connectable_name = str(connectable.uuid)
+
+        if isinstance(connectable, InputPort):
+            group = connectable.remote_process_group
+            res_group = None
+
+            for res_group_candidate in res['Remote Processing Groups']:
+                assert isinstance(res_group_candidate, dict)
+                if res_group_candidate['id'] == str(group.uuid):
+                    res_group = res_group_candidate
+
+            if res_group is None:
+                res_group = {
+                    'name': group.name,
+                    'id': str(group.uuid),
+                    'url': group.url,
+                    'timeout': '30 sec',
+                    'yield period': '3 sec',
+                    'Input Ports': []
+                }
+
+                res['Remote Processing Groups'].append(res_group)
+
+            res_group['Input Ports'].append({
+                'id': str(connectable.uuid),
+                'name': connectable.name,
+                'max concurrent tasks': 1,
+                'Properties': {}
+            })
+
+        if isinstance(connectable, Processor):
+            res['Processors'].append({
+                'name': connectable_name,
+                'id': str(connectable.uuid),
+                'class': 'org.apache.nifi.processors.standard.' + 
connectable.clazz,
+                'scheduling strategy': connectable.schedule['scheduling 
strategy'],
+                'scheduling period': connectable.schedule['scheduling period'],
+                'penalization period': connectable.schedule['penalization 
period'],
+                'yield period': connectable.schedule['yield period'],
+                'run duration nanos': connectable.schedule['run duration 
nanos'],
+                'Properties': connectable.properties,
+                'auto-terminated relationships list': 
connectable.auto_terminate
+            })
+
+            for svc in connectable.controller_services:
+                if svc in visited:
+                    continue
+
+                visited.append(svc)
+                res['Controller Services'].append({
+                    'name': svc.name,
+                    'id': svc.id,
+                    'class': svc.service_class,
+                    'Properties': svc.properties
+                })
+
+        for conn_name in connectable.connections:
+            conn_procs = connectable.connections[conn_name]
+
+            if isinstance(conn_procs, list):
+                for proc in conn_procs:
+                    res['Connections'].append({
+                        'name': str(uuid.uuid4()),
+                        'source id': str(connectable.uuid),
+                        'source relationship name': conn_name,
+                        'destination id': str(proc.uuid),
+                        'drop empty': ("true" if proc.drop_empty_flowfiles 
else "false")
+                    })
+                    if proc not in visited:
+                        self.serialize(proc, res, visited)
+            else:
+                res['Connections'].append({
+                    'name': str(uuid.uuid4()),
+                    'source id': str(connectable.uuid),
+                    'source relationship name': conn_name,
+                    'destination id': str(conn_procs.uuid)
+                })
+                if conn_procs not in visited:
+                    self.serialize(conn_procs, res, visited)
+
+        if root is None:
+            return yaml.dump(res, default_flow_style=False)
diff --git 
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py 
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
new file mode 100644
index 0000000..22fa098
--- /dev/null
+++ 
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
@@ -0,0 +1,324 @@
+import uuid
+
+import xml.etree.cElementTree as elementTree
+from xml.etree.cElementTree import Element
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+
+class Nifi_flow_xml_serializer:
+    def serialize(self, connectable, nifi_version=None, root=None, 
visited=None):
+        if visited is None:
+            visited = []
+
+        position = Element('position')
+        position.set('x', '0.0')
+        position.set('y', '0.0')
+
+        comment = Element('comment')
+        styles = Element('styles')
+        bend_points = Element('bendPoints')
+        label_index = Element('labelIndex')
+        label_index.text = '1'
+        z_index = Element('zIndex')
+        z_index.text = '0'
+
+        if root is None:
+            res = Element('flowController')
+            max_timer_driven_thread_count = 
Element('maxTimerDrivenThreadCount')
+            max_timer_driven_thread_count.text = '10'
+            res.append(max_timer_driven_thread_count)
+            max_event_driven_thread_count = 
Element('maxEventDrivenThreadCount')
+            max_event_driven_thread_count.text = '5'
+            res.append(max_event_driven_thread_count)
+            root_group = Element('rootGroup')
+            root_group_id = Element('id')
+            root_group_id_text = str(uuid.uuid4())
+            root_group_id.text = root_group_id_text
+            root_group.append(root_group_id)
+            root_group_name = Element('name')
+            root_group_name.text = root_group_id_text
+            root_group.append(root_group_name)
+            res.append(root_group)
+            root_group.append(position)
+            root_group.append(comment)
+            res.append(Element('controllerServices'))
+            res.append(Element('reportingTasks'))
+            res.set('encoding-version', '1.2')
+        else:
+            res = root
+
+        visited.append(connectable)
+
+        if hasattr(connectable, 'name'):
+            connectable_name_text = connectable.name
+        else:
+            connectable_name_text = str(connectable.uuid)
+
+        if isinstance(connectable, InputPort):
+            input_port = Element('inputPort')
+
+            input_port_id = Element('id')
+            input_port_id.text = str(connectable.uuid)
+            input_port.append(input_port_id)
+
+            input_port_name = Element('name')
+            input_port_name.text = connectable_name_text
+            input_port.append(input_port_name)
+
+            input_port.append(position)
+            input_port.append(comment)
+
+            input_port_scheduled_state = Element('scheduledState')
+            input_port_scheduled_state.text = 'RUNNING'
+            input_port.append(input_port_scheduled_state)
+
+            input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
+            input_port_max_concurrent_tasks.text = '1'
+            input_port.append(input_port_max_concurrent_tasks)
+            next( res.iterfind('rootGroup') ).append(input_port)
+
+        if isinstance(connectable, Processor):
+            conn_destination = Element('processor')
+
+            proc_id = Element('id')
+            proc_id.text = str(connectable.uuid)
+            conn_destination.append(proc_id)
+
+            proc_name = Element('name')
+            proc_name.text = connectable_name_text
+            conn_destination.append(proc_name)
+
+            conn_destination.append(position)
+            conn_destination.append(styles)
+            conn_destination.append(comment)
+
+            proc_class = Element('class')
+            proc_class.text = 'org.apache.nifi.processors.standard.' + 
connectable.clazz
+            conn_destination.append(proc_class)
+
+            proc_bundle = Element('bundle')
+            proc_bundle_group = Element('group')
+            proc_bundle_group.text = 'org.apache.nifi'
+            proc_bundle.append(proc_bundle_group)
+            proc_bundle_artifact = Element('artifact')
+            proc_bundle_artifact.text = 'nifi-standard-nar'
+            proc_bundle.append(proc_bundle_artifact)
+            proc_bundle_version = Element('version')
+            proc_bundle_version.text = nifi_version
+            proc_bundle.append(proc_bundle_version)
+            conn_destination.append(proc_bundle)
+
+            proc_max_concurrent_tasks = Element('maxConcurrentTasks')
+            proc_max_concurrent_tasks.text = '1'
+            conn_destination.append(proc_max_concurrent_tasks)
+
+            proc_scheduling_period = Element('schedulingPeriod')
+            proc_scheduling_period.text = connectable.schedule['scheduling 
period']
+            conn_destination.append(proc_scheduling_period)
+
+            proc_penalization_period = Element('penalizationPeriod')
+            proc_penalization_period.text = connectable.schedule['penalization 
period']
+            conn_destination.append(proc_penalization_period)
+
+            proc_yield_period = Element('yieldPeriod')
+            proc_yield_period.text = connectable.schedule['yield period']
+            conn_destination.append(proc_yield_period)
+
+            proc_bulletin_level = Element('bulletinLevel')
+            proc_bulletin_level.text = 'WARN'
+            conn_destination.append(proc_bulletin_level)
+
+            proc_loss_tolerant = Element('lossTolerant')
+            proc_loss_tolerant.text = 'false'
+            conn_destination.append(proc_loss_tolerant)
+
+            proc_scheduled_state = Element('scheduledState')
+            proc_scheduled_state.text = 'RUNNING'
+            conn_destination.append(proc_scheduled_state)
+
+            proc_scheduling_strategy = Element('schedulingStrategy')
+            proc_scheduling_strategy.text = connectable.schedule['scheduling 
strategy']
+            conn_destination.append(proc_scheduling_strategy)
+
+            proc_execution_node = Element('executionNode')
+            proc_execution_node.text = 'ALL'
+            conn_destination.append(proc_execution_node)
+
+            proc_run_duration_nanos = Element('runDurationNanos')
+            proc_run_duration_nanos.text = str(connectable.schedule['run 
duration nanos'])
+            conn_destination.append(proc_run_duration_nanos)
+
+            for property_key, property_value in connectable.properties.items():
+                proc_property = Element('property')
+                proc_property_name = Element('name')
+                proc_property_name.text = 
connectable.nifi_property_key(property_key)
+                if not proc_property_name.text:
+                    continue
+                proc_property.append(proc_property_name)
+                proc_property_value = Element('value')
+                proc_property_value.text = property_value
+                proc_property.append(proc_property_value)
+                conn_destination.append(proc_property)
+
+            for auto_terminate_rel in connectable.auto_terminate:
+                proc_auto_terminated_relationship = 
Element('autoTerminatedRelationship')
+                proc_auto_terminated_relationship.text = auto_terminate_rel
+                conn_destination.append(proc_auto_terminated_relationship)
+            next( res.iterfind('rootGroup') ).append(conn_destination)
+            """ res.iterfind('rootGroup').next().append(conn_destination) """
+
+            for svc in connectable.controller_services:
+                if svc in visited:
+                    continue
+
+                visited.append(svc)
+                controller_service = Element('controllerService')
+
+                controller_service_id = Element('id')
+                controller_service_id.text = str(svc.id)
+                controller_service.append(controller_service_id)
+
+                controller_service_name = Element('name')
+                controller_service_name.text = svc.name
+                controller_service.append(controller_service_name)
+
+                controller_service.append(comment)
+
+                controller_service_class = Element('class')
+                controller_service_class.text = svc.service_class,
+                controller_service.append(controller_service_class)
+
+                controller_service_bundle = Element('bundle')
+                controller_service_bundle_group = Element('group')
+                controller_service_bundle_group.text = svc.group
+                
controller_service_bundle.append(controller_service_bundle_group)
+                controller_service_bundle_artifact = Element('artifact')
+                controller_service_bundle_artifact.text = svc.artifact
+                
controller_service_bundle.append(controller_service_bundle_artifact)
+                controller_service_bundle_version = Element('version')
+                controller_service_bundle_version.text = nifi_version
+                
controller_service_bundle.append(controller_service_bundle_version)
+                controller_service.append(controller_service_bundle)
+
+                controller_enabled = Element('enabled')
+                controller_enabled.text = 'true',
+                controller_service.append(controller_enabled)
+
+                for property_key, property_value in svc.properties:
+                    controller_service_property = Element('property')
+                    controller_service_property_name = Element('name')
+                    controller_service_property_name.text = property_key
+                    
controller_service_property.append(controller_service_property_name)
+                    controller_service_property_value = Element('value')
+                    controller_service_property_value.text = property_value
+                    
controller_service_property.append(controller_service_property_value)
+                    controller_service.append(controller_service_property)
+                next( res.iterfind('rootGroup') ).append(controller_service)
+                """ 
res.iterfind('rootGroup').next().append(controller_service)"""
+
+        for conn_name in connectable.connections:
+            conn_destinations = connectable.connections[conn_name]
+
+            if isinstance(conn_destinations, list):
+                for conn_destination in conn_destinations:
+                    connection = 
self.build_nifi_flow_xml_connection_element(res,
+                                                          bend_points,
+                                                          conn_name,
+                                                          connectable,
+                                                          label_index,
+                                                          conn_destination,
+                                                          z_index)
+                    next( res.iterfind('rootGroup') ).append(connection)
+                    """ res.iterfind('rootGroup').next().append(connection) """
+
+                    if conn_destination not in visited:
+                        self.serialize(conn_destination, nifi_version, res, 
visited)
+            else:
+                connection = self.build_nifi_flow_xml_connection_element(res,
+                                                      bend_points,
+                                                      conn_name,
+                                                      connectable,
+                                                      label_index,
+                                                      conn_destinations,
+                                                      z_index)
+                next( res.iterfind('rootGroup') ).append(connection)
+                """ res.iterfind('rootGroup').next().append(connection) """
+
+                if conn_destinations not in visited:
+                    self.serialize(conn_destinations, nifi_version, res, 
visited)
+
+        if root is None:
+            return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
+                    + "\n"
+                    + elementTree.tostring(res, 
encoding='utf-8').decode('utf-8'))
+
+    def build_nifi_flow_xml_connection_element(self, res, bend_points, 
conn_name, connectable, label_index, destination, z_index):
+        connection = Element('connection')
+
+        connection_id = Element('id')
+        connection_id.text = str(uuid.uuid4())
+        connection.append(connection_id)
+
+        connection_name = Element('name')
+        connection.append(connection_name)
+
+        connection.append(bend_points)
+        connection.append(label_index)
+        connection.append(z_index)
+
+        connection_source_id = Element('sourceId')
+        connection_source_id.text = str(connectable.uuid)
+        connection.append(connection_source_id)
+
+        connection_source_group_id = Element('sourceGroupId')
+        connection_source_group_id.text = next( res.iterfind('rootGroup/id') 
).text
+        """connection_source_group_id.text = 
res.iterfind('rootGroup/id').next().text"""
+        connection.append(connection_source_group_id)
+
+        connection_source_type = Element('sourceType')
+        if isinstance(connectable, Processor):
+            connection_source_type.text = 'PROCESSOR'
+        elif isinstance(connectable, InputPort):
+            connection_source_type.text = 'INPUT_PORT'
+        else:
+            raise Exception('Unexpected source type: %s' % type(connectable))
+        connection.append(connection_source_type)
+
+        connection_destination_id = Element('destinationId')
+        connection_destination_id.text = str(destination.uuid)
+        connection.append(connection_destination_id)
+
+        connection_destination_group_id = Element('destinationGroupId')
+        connection_destination_group_id.text = 
next(res.iterfind('rootGroup/id')).text
+        """ connection_destination_group_id.text = 
res.iterfind('rootGroup/id').next().text """
+        connection.append(connection_destination_group_id)
+
+        connection_destination_type = Element('destinationType')
+        if isinstance(destination, Processor):
+            connection_destination_type.text = 'PROCESSOR'
+        elif isinstance(destination, InputPort):
+            connection_destination_type.text = 'INPUT_PORT'
+        else:
+            raise Exception('Unexpected destination type: %s' % 
type(destination))
+        connection.append(connection_destination_type)
+
+        connection_relationship = Element('relationship')
+        if not isinstance(connectable, InputPort):
+            connection_relationship.text = conn_name
+        connection.append(connection_relationship)
+
+        connection_max_work_queue_size = Element('maxWorkQueueSize')
+        connection_max_work_queue_size.text = '10000'
+        connection.append(connection_max_work_queue_size)
+
+        connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
+        connection_max_work_queue_data_size.text = '1 GB'
+        connection.append(connection_max_work_queue_data_size)
+
+        connection_flow_file_expiration = Element('flowFileExpiration')
+        connection_flow_file_expiration.text = '0 sec'
+        connection.append(connection_flow_file_expiration)
+
+        return connection
+
diff --git a/docker/test/integration/minifi/flow_serialization/__init__.py 
b/docker/test/integration/minifi/flow_serialization/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/docker/test/integration/minifi/processors/DeleteS3Object.py 
b/docker/test/integration/minifi/processors/DeleteS3Object.py
new file mode 100644
index 0000000..003a9cd
--- /dev/null
+++ b/docker/test/integration/minifi/processors/DeleteS3Object.py
@@ -0,0 +1,22 @@
+from ..core.Processor import Processor
+
+class DeleteS3Object(Processor):
+    def __init__(self,
+        proxy_host = '',
+        proxy_port = '',
+        proxy_username = '',
+        proxy_password = ''):
+            super(DeleteS3Object, self).__init__('DeleteS3Object',
+            properties = {
+                'Object Key': 'test_object_key',
+                'Bucket': 'test_bucket',
+                'Access Key': 'test_access_key',
+                'Secret Key': 'test_secret',
+                'Endpoint Override URL': "http://s3-server:9090";,
+                'Proxy Host': proxy_host,
+                'Proxy Port': proxy_port,
+                'Proxy Username': proxy_username,
+                'Proxy Password': proxy_password,
+            },
+            auto_terminate=['success'])
+
diff --git a/docker/test/integration/minifi/processors/GenerateFlowFile.py 
b/docker/test/integration/minifi/processors/GenerateFlowFile.py
new file mode 100644
index 0000000..93d42ca
--- /dev/null
+++ b/docker/test/integration/minifi/processors/GenerateFlowFile.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+class GenerateFlowFile(Processor):
+    def __init__(self, file_size, schedule={'scheduling period': '0 sec'}):
+        super(GenerateFlowFile, self).__init__('GenerateFlowFile',
+                       properties={'File Size': file_size},
+                       schedule=schedule,
+                       auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/GetFile.py 
b/docker/test/integration/minifi/processors/GetFile.py
new file mode 100644
index 0000000..29b8180
--- /dev/null
+++ b/docker/test/integration/minifi/processors/GetFile.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+class GetFile(Processor):
+       def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}):
+               super(GetFile, self).__init__('GetFile',
+                       properties={'Input Directory': input_dir, 'Keep Source 
File': 'true'},
+                       schedule=schedule,
+                       auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/InvokeHTTP.py 
b/docker/test/integration/minifi/processors/InvokeHTTP.py
new file mode 100644
index 0000000..135c8d5
--- /dev/null
+++ b/docker/test/integration/minifi/processors/InvokeHTTP.py
@@ -0,0 +1,30 @@
+from ..core.Processor import Processor
+
+class InvokeHTTP(Processor):
+    def __init__(self, url,
+        method='GET',
+        proxy_host='',
+        proxy_port='',
+        proxy_username='',
+        proxy_password='',
+        ssl_context_service=None,
+        schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+            properties = {
+                'Remote URL': url,
+                'HTTP Method': method,
+                'Proxy Host': proxy_host,
+                'Proxy Port': proxy_port,
+                'invokehttp-proxy-username': proxy_username,
+                'invokehttp-proxy-password': proxy_password }
+
+            controller_services = []
+
+            if ssl_context_service is not None:
+                properties['SSL Context Service'] = ssl_context_service.name
+                controller_services.append(ssl_context_service)
+
+            super(InvokeHTTP, self).__init__('InvokeHTTP',
+                properties = properties,
+                controller_services = controller_services,
+                auto_terminate = ['success', 'response', 'retry', 'failure', 
'no retry'],
+                schedule = schedule)
diff --git a/docker/test/integration/minifi/processors/ListenHTTP.py 
b/docker/test/integration/minifi/processors/ListenHTTP.py
new file mode 100644
index 0000000..6f5bca1
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListenHTTP.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+class ListenHTTP(Processor):
+    def __init__(self, port, cert=None, schedule=None):
+        properties = {'Listening Port': port}
+
+        if cert is not None:
+            properties['SSL Certificate'] = cert
+            properties['SSL Verify Peer'] = 'no'
+
+        super(ListenHTTP, self).__init__('ListenHTTP',
+                       properties=properties,
+                       auto_terminate=['success'],
+                       schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/LogAttribute.py 
b/docker/test/integration/minifi/processors/LogAttribute.py
new file mode 100644
index 0000000..1be4c38
--- /dev/null
+++ b/docker/test/integration/minifi/processors/LogAttribute.py
@@ -0,0 +1,7 @@
+from ..core.Processor import Processor
+
+class LogAttribute(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(LogAttribute, self).__init__('LogAttribute',
+                       auto_terminate=['success'],
+                       schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishKafka.py 
b/docker/test/integration/minifi/processors/PublishKafka.py
new file mode 100644
index 0000000..698cd74
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PublishKafka.py
@@ -0,0 +1,10 @@
+from ..core.Processor import Processor
+
+class PublishKafka(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(PublishKafka, self).__init__('PublishKafka',
+                       properties={'Client Name': 'nghiaxlee', 'Known 
Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
+                               'Batch Size': '10', 'Compress Codec': 'none', 
'Delivery Guarantee': '1',
+                               'Request Timeout': '10 sec', 'Message Timeout': 
'12 sec'},
+                       auto_terminate=['success'],
+                       schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishKafkaSSL.py 
b/docker/test/integration/minifi/processors/PublishKafkaSSL.py
new file mode 100644
index 0000000..82c33f6
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PublishKafkaSSL.py
@@ -0,0 +1,16 @@
+from ..core.Processor import Processor
+
+class PublishKafkaSSL(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(PublishKafkaSSL, self).__init__('PublishKafka',
+            properties={'Client Name': 'LMN', 'Known Brokers': 
'kafka-broker:9093',
+                'Topic Name': 'test', 'Batch Size': '10',
+                'Compress Codec': 'none', 'Delivery Guarantee': '1',
+                'Request Timeout': '10 sec', 'Message Timeout': '12 sec',
+                'Security CA': '/tmp/resources/certs/ca-cert',
+                'Security Cert': '/tmp/resources/certs/client_LMN_client.pem',
+                'Security Pass Phrase': 'abcdefgh',
+                'Security Private Key': 
'/tmp/resources/certs/client_LMN_client.key',
+                'Security Protocol': 'ssl'},
+            auto_terminate=['success'],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutFile.py 
b/docker/test/integration/minifi/processors/PutFile.py
new file mode 100644
index 0000000..047d32d
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutFile.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+class PutFile(Processor):
+    def __init__(self, output_dir, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
+        super(PutFile, self).__init__('PutFile',
+            properties={'Directory': output_dir, 'Directory Permissions': 
'777', 'Permissions': '777'},
+            auto_terminate=['success', 'failure'],
+            schedule=schedule)
+
+    def nifi_property_key(self, key):
+        if key == 'Directory Permissions':
+            return None
+        else:
+            return key
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py 
b/docker/test/integration/minifi/processors/PutS3Object.py
new file mode 100644
index 0000000..74fb4a8
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutS3Object.py
@@ -0,0 +1,20 @@
+from ..core.Processor import Processor
+
+class PutS3Object(Processor):
+    def __init__(self,
+        proxy_host='',
+        proxy_port='',
+        proxy_username='',
+        proxy_password=''):
+            super(PutS3Object, self).__init__('PutS3Object',
+            properties = {
+                'Object Key': 'test_object_key',
+                'Bucket': 'test_bucket',
+                'Access Key': 'test_access_key',
+                'Secret Key': 'test_secret',
+                'Endpoint Override URL': "http://s3-server:9090";,
+                'Proxy Host': proxy_host,
+                'Proxy Port': proxy_port,
+                'Proxy Username': proxy_username,
+                'Proxy Password': proxy_password },
+            auto_terminate = ['success'])
diff --git a/docker/test/integration/minifi/processors/__init__.py 
b/docker/test/integration/minifi/processors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git 
a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py 
b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
new file mode 100644
index 0000000..689cf7d
--- /dev/null
+++ b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
@@ -0,0 +1,27 @@
+import logging
+import os
+
+from os import listdir
+
+from .FileOutputValidator import FileOutputValidator
+
+class EmptyFilesOutPutValidator(FileOutputValidator):
+
+    """
+    Validates if all the files in the target directory are empty and at least 
one exists
+    """
+    def __init__(self):
+        self.valid = False
+
+    def validate(self, dir=''):
+
+        if self.valid:
+            return True
+
+        full_dir = self.output_dir + dir
+        logging.info("Output folder: %s", full_dir)
+        listing = listdir(full_dir)
+        if listing:
+            self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 
for x in listing)
+
+        return self.valid
diff --git a/docker/test/integration/minifi/validators/FileOutputValidator.py 
b/docker/test/integration/minifi/validators/FileOutputValidator.py
new file mode 100644
index 0000000..d558c43
--- /dev/null
+++ b/docker/test/integration/minifi/validators/FileOutputValidator.py
@@ -0,0 +1,8 @@
+from .OutputValidator import OutputValidator
+
+class FileOutputValidator(OutputValidator):
+    def set_output_dir(self, output_dir):
+        self.output_dir = output_dir
+
+    def validate(self, dir=''):
+        pass
diff --git a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py 
b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
new file mode 100644
index 0000000..f60a008
--- /dev/null
+++ b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
@@ -0,0 +1,25 @@
+import logging
+
+from os import listdir
+
+from .FileOutputValidator import FileOutputValidator
+
+class NoFileOutPutValidator(FileOutputValidator):
+    """
+    Validates if no flowfiles were transferred
+    """
+    def __init__(self):
+        self.valid = False
+
+    def validate(self, dir=''):
+
+        if self.valid:
+            return True
+
+        full_dir = self.output_dir + dir
+        logging.info("Output folder: %s", full_dir)
+        listing = listdir(full_dir)
+
+        self.valid = not bool(listing)
+
+        return self.valid
diff --git a/docker/test/integration/minifi/validators/OutputValidator.py 
b/docker/test/integration/minifi/validators/OutputValidator.py
new file mode 100644
index 0000000..c05d5fa
--- /dev/null
+++ b/docker/test/integration/minifi/validators/OutputValidator.py
@@ -0,0 +1,12 @@
+class OutputValidator(object):
+    """
+    Base output validator class. Validators must implement
+    method validate, which returns a boolean.
+    """
+
+    def validate(self):
+        """
+        Return True if output is valid; False otherwise.
+        """
+        raise NotImplementedError("validate function needs to be implemented 
for validators")
+
diff --git a/docker/test/integration/minifi/validators/SegfaultValidator.py 
b/docker/test/integration/minifi/validators/SegfaultValidator.py
new file mode 100644
index 0000000..ee0227d
--- /dev/null
+++ b/docker/test/integration/minifi/validators/SegfaultValidator.py
@@ -0,0 +1,8 @@
+from .OutputValidator import OutputValidator
+
+class SegfaultValidator(OutputValidator):
+    """
+    Validate that a file was received.
+    """
+    def validate(self):
+        return True
diff --git 
a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py 
b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
new file mode 100644
index 0000000..7466b41
--- /dev/null
+++ b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
@@ -0,0 +1,44 @@
+import logging
+import os
+
+from os import listdir
+from os.path import join
+
+from .FileOutputValidator import FileOutputValidator
+
+class SingleFileOutputValidator(FileOutputValidator):
+    """
+    Validates the content of a single file in the given directory.
+    """
+
+    def __init__(self, expected_content, subdir=''):
+        self.valid = False
+        self.expected_content = expected_content
+        self.subdir = subdir
+
+    def validate(self):
+        self.valid = False
+        full_dir = os.path.join(self.output_dir, self.subdir)
+        logging.info("Output folder: %s", full_dir)
+
+        if not os.path.isdir(full_dir):
+            return self.valid
+
+        listing = listdir(full_dir)
+        if listing:
+            for l in listing:
+                logging.info("name:: %s", l)
+            out_file_name = listing[0]
+            full_path = join(full_dir, out_file_name)
+            if not os.path.isfile(full_path):
+                return self.valid
+
+            with open(full_path, 'r') as out_file:
+                contents = out_file.read()
+                logging.info("dir %s -- name %s", full_dir, out_file_name)
+                logging.info("expected %s -- content %s", 
self.expected_content, contents)
+
+                if self.expected_content in contents:
+                    self.valid = True
+
+        return self.valid
diff --git a/docker/test/integration/minifi/validators/__init__.py 
b/docker/test/integration/minifi/validators/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/docker/test/integration/test_filesystem_ops.py 
b/docker/test/integration/test_filesystem_ops.py
index 3ae4ff5..5b48e6e 100644
--- a/docker/test/integration/test_filesystem_ops.py
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
-
 
 def test_get_put():
     """
diff --git a/docker/test/integration/test_filter_zero_file.py 
b/docker/test/integration/test_filter_zero_file.py
index b56b23f..918582b 100644
--- a/docker/test/integration/test_filter_zero_file.py
+++ b/docker/test/integration/test_filter_zero_file.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
 
 def test_filter_zero_file():
     """
diff --git a/docker/test/integration/test_hash_content.py 
b/docker/test/integration/test_hash_content.py
index 1482014..e60d3a3 100644
--- a/docker/test/integration/test_hash_content.py
+++ b/docker/test/integration/test_hash_content.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
-
 
 def test_hash_invoke():
     """
diff --git a/docker/test/integration/test_http.py 
b/docker/test/integration/test_http.py
index facc45c..f9431d4 100644
--- a/docker/test/integration/test_http.py
+++ b/docker/test/integration/test_http.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
-
 
 def test_invoke_listen():
     """
diff --git a/docker/test/integration/test_rdkafka.py 
b/docker/test/integration/test_rdkafka.py
index 980e66f..bea36db 100644
--- a/docker/test/integration/test_rdkafka.py
+++ b/docker/test/integration/test_rdkafka.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
-
 
 def test_publish_kafka():
     """
diff --git a/docker/test/integration/test_s2s.py 
b/docker/test/integration/test_s2s.py
index 3125139..ff1a312 100644
--- a/docker/test/integration/test_s2s.py
+++ b/docker/test/integration/test_s2s.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
-
 
 def test_minifi_to_nifi():
     """
diff --git a/docker/test/integration/test_s3.py 
b/docker/test/integration/test_s3.py
index d961003..75b361a 100644
--- a/docker/test/integration/test_s3.py
+++ b/docker/test/integration/test_s3.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
-
 
 def test_put_s3_object():
     """
diff --git a/docker/test/integration/test_zero_file.py 
b/docker/test/integration/test_zero_file.py
index 3223329..1a0cf05 100644
--- a/docker/test/integration/test_zero_file.py
+++ b/docker/test/integration/test_zero_file.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 from minifi import *
-from minifi.test import *
 
 def test_zero_file():
     """
diff --git a/docker/test/test_https.py b/docker/test/test_https.py
index 79a565f..2ea1bed 100644
--- a/docker/test/test_https.py
+++ b/docker/test/test_https.py
@@ -18,8 +18,6 @@ import time
 from M2Crypto import X509, EVP, RSA, ASN1
 
 from minifi import *
-from minifi.test import *
-
 
 def callback():
     pass

Reply via email to