This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3885493eb784a571a8f3f1e06dbf5aca7b221e2a
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Nov 27 13:05:21 2024 +0100

    MINIFICPP-2470 Add SSL and mTLS authentication support to 
CouchbaseClusterService
    
    - Add mTLS authentication option with linked SSLContextService of 
CouchbaseClusterService
    - Add support for adding CA certificate with linked SSLContextService to 
use SSL communication when using username/password authentication
    - Fix enabling linked services of a controller service before enabling the 
controller service itself
    
    Closes #1882
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../cluster/DockerTestDirectoryBindings.py         | 10 ++-
 .../cluster/containers/CouchbaseServerContainer.py | 88 +++++++++++++++++++---
 docker/test/integration/features/couchbase.feature | 57 ++++++++++++++
 docker/test/integration/features/steps/steps.py    | 30 +++++++-
 .../minifi/controllers/CouchbaseClusterService.py  |  9 ++-
 .../integration/minifi/core/ControllerService.py   |  1 +
 .../Minifi_flow_json_serializer.py                 | 13 ++--
 .../Minifi_flow_yaml_serializer.py                 | 38 +++++-----
 examples/couchbase_mtls_authentication.json        | 74 ++++++++++++++++++
 examples/couchbase_mtls_authentication.yml         | 45 +++++++++++
 .../controllerservices/CouchbaseClusterService.cpp | 66 ++++++++++++++--
 .../controllerservices/CouchbaseClusterService.h   | 12 +--
 .../controller/StandardControllerServiceNode.cpp   | 15 +++-
 13 files changed, 407 insertions(+), 51 deletions(-)

diff --git a/docker/test/integration/cluster/DockerTestDirectoryBindings.py 
b/docker/test/integration/cluster/DockerTestDirectoryBindings.py
index 87cf2b4ec..25add90ae 100644
--- a/docker/test/integration/cluster/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py
@@ -18,7 +18,7 @@ import shutil
 import hashlib
 import subprocess
 import OpenSSL.crypto
-from ssl_utils.SSL_cert_utils import make_self_signed_cert, 
make_cert_without_extended_usage, make_server_cert
+from ssl_utils.SSL_cert_utils import make_self_signed_cert, 
make_cert_without_extended_usage, make_server_cert, make_client_cert
 
 
 class DockerTestDirectoryBindings:
@@ -214,3 +214,11 @@ class DockerTestDirectoryBindings:
             os.path.join(base, "root_ca.crt"),
         ]
         subprocess.run(cmd, check=True)
+
+        clientuser_cert, clientuser_key = make_client_cert("clientuser", 
ca_cert=self.root_ca_cert, ca_key=self.root_ca_key)
+        self.put_test_resource('clientuser.crt',
+                               
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
+                                                               
cert=clientuser_cert))
+        self.put_test_resource('clientuser.key',
+                               
OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
+                                                              
pkey=clientuser_key))
diff --git 
a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py 
b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
index 9baf01f08..b2f13b732 100644
--- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
+++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
@@ -12,19 +12,67 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import os
+import OpenSSL.crypto
+import tempfile
+import docker
+import requests
+import logging
+from requests.auth import HTTPBasicAuth
 from .Container import Container
 from utils import retry_check
+from ssl_utils.SSL_cert_utils import make_server_cert
 
 
 class CouchbaseServerContainer(Container):
     def __init__(self, feature_context, name, vols, network, image_store, 
command=None):
-        super().__init__(feature_context, name, 'couchbase-server', vols, 
network, image_store, command)
+        super().__init__(feature_context, name, "couchbase-server", vols, 
network, image_store, command)
+        couchbase_cert, couchbase_key = 
make_server_cert(f"couchbase-server-{feature_context.id}", 
feature_context.root_ca_cert, feature_context.root_ca_key)
+
+        self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
+        
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=feature_context.root_ca_cert))
+        self.root_ca_file.close()
+        os.chmod(self.root_ca_file.name, 0o666)
+
+        self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False)
+        
self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
 cert=couchbase_cert))
+        self.couchbase_cert_file.close()
+        os.chmod(self.couchbase_cert_file.name, 0o666)
+
+        self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False)
+        
self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
 pkey=couchbase_key))
+        self.couchbase_key_file.close()
+        os.chmod(self.couchbase_key_file.name, 0o666)
 
     def get_startup_finished_log_entry(self):
         # after startup the logs are only available in the container, only 
this message is shown
         return "logs available in"
 
-    @retry_check(15, 2)
+    @retry_check(max_tries=12, retry_interval=5)
+    def _run_couchbase_cli_command(self, command):
+        (code, _) = self.client.containers.get(self.name).exec_run(command)
+        if code != 0:
+            logging.error(f"Failed to run command '{command}', returned error 
code: {code}")
+            return False
+        return True
+
+    def _run_couchbase_cli_commands(self, commands):
+        return all(self._run_couchbase_cli_command(command) for command in 
commands)
+
+    @retry_check(max_tries=15, retry_interval=2)
+    def _load_couchbase_certs(self):
+        response = 
requests.post("http://localhost:8091/node/controller/loadTrustedCAs";, 
auth=HTTPBasicAuth("Administrator", "password123"))
+        if response.status_code != 200:
+            logging.error(f"Failed to load CA certificates, with status code: 
{response.status_code}")
+            return False
+
+        response = 
requests.post("http://localhost:8091/node/controller/reloadCertificate";, 
auth=HTTPBasicAuth("Administrator", "password123"))
+        if response.status_code != 200:
+            logging.error(f"Failed to reload certificates, with status code: 
{response.status_code}")
+            return False
+
+        return True
+
     def run_post_startup_commands(self):
         if self.post_startup_commands_finished:
             return True
@@ -33,12 +81,18 @@ class CouchbaseServerContainer(Container):
             ["couchbase-cli", "cluster-init", "-c", "localhost", 
"--cluster-username", "Administrator", "--cluster-password", "password123", 
"--services", "data,index,query",
              "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
             ["couchbase-cli", "bucket-create", "-c", "localhost", 
"--username", "Administrator", "--password", "password123", "--bucket", 
"test_bucket", "--bucket-type", "couchbase",
-             "--bucket-ramsize", "1024", "--max-ttl", "36000"]
+             "--bucket-ramsize", "1024", "--max-ttl", "36000"],
+            ["couchbase-cli", "user-manage", "-c", "localhost", "-u", 
"Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", 
"--rbac-password", "password123",
+             "--roles", "data_reader[test_bucket],data_writer[test_bucket]", 
"--auth-domain", "local"],
+            ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", 
"prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''],
+            ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 
'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json']
         ]
-        for command in commands:
-            (code, _) = self.client.containers.get(self.name).exec_run(command)
-            if code != 0:
-                return False
+        if not self._run_couchbase_cli_commands(commands):
+            return False
+
+        if not self._load_couchbase_certs():
+            return False
+
         self.post_startup_commands_finished = True
         return True
 
@@ -46,10 +100,26 @@ class CouchbaseServerContainer(Container):
         if not self.set_deployed():
             return
 
+        mounts = [
+            docker.types.Mount(
+                type='bind',
+                source=self.couchbase_key_file.name,
+                target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'),
+            docker.types.Mount(
+                type='bind',
+                source=self.couchbase_cert_file.name,
+                target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'),
+            docker.types.Mount(
+                type='bind',
+                source=self.root_ca_file.name,
+                target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt')
+        ]
+
         self.docker_container = self.client.containers.run(
             "couchbase:enterprise-7.2.5",
             detach=True,
             name=self.name,
             network=self.network.name,
-            ports={'11210/tcp': 11210},
-            entrypoint=self.command)
+            ports={'8091/tcp': 8091, '11210/tcp': 11210},
+            entrypoint=self.command,
+            mounts=mounts)
diff --git a/docker/test/integration/features/couchbase.feature 
b/docker/test/integration/features/couchbase.feature
index 646eaacd9..7b74a6c3d 100644
--- a/docker/test/integration/features/couchbase.feature
+++ b/docker/test/integration/features/couchbase.feature
@@ -173,3 +173,60 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And all instances start up
 
     Then the Minifi logs contain the following message: "Failed to get content 
for document 'test_doc_id' from collection 'test_bucket._default._default' with 
the following exception: 'raw_binary_transcoder expects document to have BINARY 
common flags" in less than 60 seconds
+
+  Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor using SSL connection
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And the "Keep Source File" property of the GetFile processor is set to 
"true"
+    And the scheduling period of the GetFile processor is set to "20 seconds"
+    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And a CouchbaseClusterService is set up up with SSL connection with the 
name "CouchbaseClusterService"
+
+    And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
+    And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+    And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+
+    When a Couchbase server is started
+    And all instances start up
+
+    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 6000 seconds
+    And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
+    And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.expiry 
value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
+
+  Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor using mTLS authentication
+    Given a MiNiFi CPP server with yaml config
+    And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And a CouchbaseClusterService is setup up using mTLS authentication with 
the name "CouchbaseClusterService"
+
+    And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
+    And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+    And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+
+    When a Couchbase server is started
+    And all instances start up
+
+    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 6000 seconds
+    And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
+    And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.expiry 
value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index 6d4e464e9..70a193888 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -573,7 +573,7 @@ def step_impl(context):
     minifi_crt_file = '/tmp/resources/minifi_client.crt'
     minifi_key_file = '/tmp/resources/minifi_client.key'
     root_ca_crt_file = '/tmp/resources/root_ca.crt'
-    ssl_context_service = SSLContextService(cert=minifi_crt_file, 
ca_cert=root_ca_crt_file, key=minifi_key_file)
+    ssl_context_service = SSLContextService(name='SSLContextService', 
cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
 
     splunk_cert, splunk_key = 
make_server_cert(context.test.get_container_name_with_postfix("splunk"), 
context.root_ca_cert, context.root_ca_key)
     put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP")
@@ -1384,6 +1384,34 @@ def step_impl(context, service_name):
     container.add_controller(couchbase_cluster_controller_service)
 
 
+@given("a CouchbaseClusterService is set up up with SSL connection with the 
name \"{service_name}\"")
+def step_impl(context, service_name):
+    ssl_context_service = SSLContextService(name="SSLContextService",
+                                            
ca_cert='/tmp/resources/root_ca.crt')
+    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container.add_controller(ssl_context_service)
+    couchbase_cluster_controller_service = CouchbaseClusterService(
+        name=service_name,
+        
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
+        ssl_context_service=ssl_context_service)
+    container.add_controller(couchbase_cluster_controller_service)
+
+
 @then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present 
with data '{data}' of type \"{data_type}\" in Couchbase")
 def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: 
str):
     context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, 
data_type)
+
+
+@given("a CouchbaseClusterService is setup up using mTLS authentication with 
the name \"{service_name}\"")
+def step_impl(context, service_name):
+    ssl_context_service = SSLContextService(name="SSLContextService",
+                                            
cert='/tmp/resources/clientuser.crt',
+                                            
key='/tmp/resources/clientuser.key',
+                                            
ca_cert='/tmp/resources/root_ca.crt')
+    container = context.test.acquire_container(context=context, 
name="minifi-cpp-flow")
+    container.add_controller(ssl_context_service)
+    couchbase_cluster_controller_service = CouchbaseClusterService(
+        name=service_name,
+        
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
+        ssl_context_service=ssl_context_service)
+    container.add_controller(couchbase_cluster_controller_service)
diff --git 
a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py 
b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
index e06c1e6ff..94494fe17 100644
--- a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
+++ b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
@@ -18,10 +18,13 @@ from ..core.ControllerService import ControllerService
 
 
 class CouchbaseClusterService(ControllerService):
-    def __init__(self, name, connection_string):
+    def __init__(self, name, connection_string, ssl_context_service=None):
         super(CouchbaseClusterService, self).__init__(name=name)
 
         self.service_class = 'CouchbaseClusterService'
         self.properties['Connection String'] = connection_string
-        self.properties['User Name'] = "Administrator"
-        self.properties['User Password'] = "password123"
+        if ssl_context_service:
+            self.linked_services.append(ssl_context_service)
+        if not ssl_context_service or ssl_context_service and 'Client 
Certificate' not in ssl_context_service.properties:
+            self.properties['User Name'] = "Administrator"
+            self.properties['User Password'] = "password123"
diff --git a/docker/test/integration/minifi/core/ControllerService.py 
b/docker/test/integration/minifi/core/ControllerService.py
index f29513139..02d32e4a7 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -34,3 +34,4 @@ class ControllerService(object):
             properties = {}
 
         self.properties = properties
+        self.linked_services = []
diff --git 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
index c1043d506..a100ae038 100644
--- 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
+++ 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
@@ -118,12 +118,7 @@ class Minifi_flow_json_serializer:
                     continue
 
                 visited.append(svc)
-                root['controllerServices'].append({
-                    'name': svc.name,
-                    'identifier': svc.id,
-                    'type': svc.service_class,
-                    'properties': svc.properties
-                })
+                self.serialize_controller(svc, root)
 
         if isinstance(connectable, Funnel):
             root['funnels'].append({
@@ -159,3 +154,9 @@ class Minifi_flow_json_serializer:
             'type': controller.service_class,
             'properties': controller.properties
         })
+
+        if controller.linked_services:
+            if len(controller.linked_services) == 1:
+                root['controllerServices'][-1]['properties']['Linked 
Services'] = controller.linked_services[0].name
+            else:
+                root['controllerServices'][-1]['properties']['Linked 
Services'] = [{"value": service.name} for service in controller.linked_services]
diff --git 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
index dc912b122..14cbabc98 100644
--- 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
+++ 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -119,12 +119,7 @@ class Minifi_flow_yaml_serializer:
                     continue
 
                 visited.append(svc)
-                res['Controller Services'].append({
-                    'name': svc.name,
-                    'id': svc.id,
-                    'class': svc.service_class,
-                    'Properties': svc.properties
-                })
+                self._add_controller_service_node(svc, res)
 
         if isinstance(connectable, Funnel):
             res['Funnels'].append({
@@ -160,6 +155,25 @@ class Minifi_flow_yaml_serializer:
 
         return (res, visited)
 
+    def _add_controller_service_node(self, controller, parent):
+        if hasattr(controller, 'name'):
+            connectable_name = controller.name
+        else:
+            connectable_name = str(controller.uuid)
+
+        parent['Controller Services'].append({
+            'name': connectable_name,
+            'id': controller.id,
+            'class': controller.service_class,
+            'Properties': controller.properties
+        })
+
+        if controller.linked_services:
+            if len(controller.linked_services) == 1:
+                parent['Controller Services'][-1]['Properties']['Linked 
Services'] = controller.linked_services[0].name
+            else:
+                parent['Controller Services'][-1]['Properties']['Linked 
Services'] = [{"value": service.name} for service in controller.linked_services]
+
     def serialize_controller(self, controller, root=None):
         if root is None:
             res = {
@@ -175,16 +189,6 @@ class Minifi_flow_yaml_serializer:
         else:
             res = root
 
-        if hasattr(controller, 'name'):
-            connectable_name = controller.name
-        else:
-            connectable_name = str(controller.uuid)
-
-        res['Controller Services'].append({
-            'name': connectable_name,
-            'id': controller.id,
-            'class': controller.service_class,
-            'Properties': controller.properties
-        })
+        self._add_controller_service_node(controller, res)
 
         return res
diff --git a/examples/couchbase_mtls_authentication.json 
b/examples/couchbase_mtls_authentication.json
new file mode 100644
index 000000000..7ac14df25
--- /dev/null
+++ b/examples/couchbase_mtls_authentication.json
@@ -0,0 +1,74 @@
+{
+    "parameterContexts": [],
+    "rootGroup": {
+        "name": "MiNiFi Flow",
+        "processors": [
+            {
+                "name": "Get Couchbase document file from local directory",
+                "identifier": "21b1e56e-e8d5-4543-9f6b-be148f91fb02",
+                "type": "org.apache.nifi.processors.standard.GetFile",
+                "schedulingStrategy": "TIMER_DRIVEN",
+                "schedulingPeriod": "2 sec",
+                "penaltyDuration": "30 sec",
+                "properties": {
+                    "Input Directory": "/home/user/couchbase/input"
+                },
+                "autoTerminatedRelationships": [],
+                "concurrentlySchedulableTaskCount": 1
+            },
+            {
+                "name": "Insert Couchbase document",
+                "identifier": "df762d53-0f94-4611-be01-e689b8992573",
+                "type": "org.apache.nifi.processors.standard.PutCouchbaseKey",
+                "schedulingStrategy": "EVENT_DRIVEN",
+                "penaltyDuration": "30 sec",
+                "properties": {
+                    "Bucket Name": "test_bucket",
+                    "Document Id": "test_doc_id",
+                    "Couchbase Cluster Controller Service": 
"CouchbaseClusterService for mTLS authentication"
+                },
+                "autoTerminatedRelationships": [
+                    "success",
+                    "failure",
+                    "retry"
+                ]
+            }
+        ],
+        "controllerServices": [
+            {
+                "name": "SSLContextService for Couchbase",
+                "identifier": "33e03d54-9917-494e-8ba0-8caeb3fdf4de",
+                "type": "SSLContextService",
+                "properties": {
+                    "Client Certificate": 
"/home/user/couchbase/certs/clientuser.crt",
+                    "Private Key": "/home/user/couchbase/certs/clientuser.key",
+                    "CA Certificate": "/home/user/couchbase/certsroot_ca.crt"
+                }
+            },
+            {
+                "name": "CouchbaseClusterService for mTLS authentication",
+                "identifier": "747bae3c-e68e-40af-8933-02179bd6cf85",
+                "type": "CouchbaseClusterService",
+                "properties": {
+                    "Connection String": 
"couchbases://couchbase-server-hLkYUYq55djwrW5A26XNJD",
+                    "Linked Services": "SSLContextService for Couchbase"
+                }
+            }
+        ],
+        "connections": [
+            {
+                "identifier": "94fdd7b1-7857-44c3-8cf2-d373a5578420",
+                "name": "GetFile/success/PutCouchbaseKey",
+                "source": {
+                    "id": "21b1e56e-e8d5-4543-9f6b-be148f91fb02"
+                },
+                "destination": {
+                    "id": "df762d53-0f94-4611-be01-e689b8992573"
+                },
+                "selectedRelationships": [
+                    "success"
+                ]
+            }
+        ],
+    }
+}
diff --git a/examples/couchbase_mtls_authentication.yml 
b/examples/couchbase_mtls_authentication.yml
new file mode 100644
index 000000000..7ef1033e7
--- /dev/null
+++ b/examples/couchbase_mtls_authentication.yml
@@ -0,0 +1,45 @@
+Flow Controller:
+  name: MiNiFi Flow
+Processors:
+- id: 21b1e56e-e8d5-4543-9f6b-be148f91fb02
+  name: Get Couchbase document file from local directory
+  class: org.apache.nifi.processors.standard.GetFile
+  scheduling strategy: TIMER_DRIVEN
+  scheduling period: 2 sec
+  penalization period: 30 sec
+  Properties:
+    Input Directory: /tmp/input
+  auto-terminated relationships list: []
+- id: df762d53-0f94-4611-be01-e689b8992573
+  name: Insert Couchbase document
+  class: org.apache.nifi.processors.standard.PutCouchbaseKey
+  scheduling strategy: EVENT_DRIVEN
+  penalization period: 30 sec
+  Properties:
+    Bucket Name: test_bucket
+    Couchbase Cluster Controller Service: CouchbaseClusterService for mTLS 
authentication
+    Document Id: test_doc_id
+  auto-terminated relationships list:
+  - success
+  - failure
+  - retry
+Controller Services:
+- id: 33e03d54-9917-494e-8ba0-8caeb3fdf4de
+  name: SSLContextService for Couchbase
+  class: SSLContextService
+  Properties:
+    CA Certificate: /tmp/resources/root_ca.crt
+    Client Certificate: /tmp/resources/clientuser.crt
+    Private Key: /tmp/resources/clientuser.key
+- id: 747bae3c-e68e-40af-8933-02179bd6cf85
+  name: CouchbaseClusterService for mTLS authentication
+  class: CouchbaseClusterService
+  Properties:
+    Connection String: couchbases://couchbase-server-VPQDsPD2pj35q5WzHNt9ER
+    Linked Services: SSLContextService for Couchbase
+Connections:
+- id: 94fdd7b1-7857-44c3-8cf2-d373a5578420
+  destination id: df762d53-0f94-4611-be01-e689b8992573
+  name: GetFile/success/PutCouchbaseKey
+  source id: 21b1e56e-e8d5-4543-9f6b-be148f91fb02
+  source relationship name: success
diff --git 
a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
index c40697cf7..ac340f864 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
@@ -50,6 +50,50 @@ CouchbaseErrorType getErrorType(const std::error_code& 
error_code) {
 
 }  // namespace
 
+CouchbaseClient::CouchbaseClient(std::string connection_string, std::string 
username, std::string password, minifi::controllers::SSLContextService* 
ssl_context_service,
+  const std::shared_ptr<core::logging::Logger>& logger)
+    : connection_string_(std::move(connection_string)), logger_(logger), 
cluster_options_(buildClusterOptions(std::move(username), std::move(password), 
ssl_context_service)) {
+}
+
+::couchbase::cluster_options CouchbaseClient::buildClusterOptions(std::string 
username, std::string password, minifi::controllers::SSLContextService* 
ssl_context_service) {
+  if (username.empty() && (!ssl_context_service || (ssl_context_service && 
ssl_context_service->getCertificateFile().empty()))) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Neither username and password nor SSLContextService is provided for Couchbase 
authentication");
+  }
+
+  if (!username.empty() && ssl_context_service && 
!ssl_context_service->getCertificateFile().empty()) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Either 
username and password or mTLS certificate authentication should be used in the 
SSLContextService for Couchbase, "
+      "but not both");
+  }
+
+  if (!username.empty()) {
+    logger_->log_debug("Using username and password authentication for 
Couchbase server");
+    if (password.empty()) {
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Password missing for Couchbase server authentication");
+    }
+    ::couchbase::cluster_options cluster_options(std::move(username), 
std::move(password));
+    if (ssl_context_service && 
!ssl_context_service->getCACertificate().empty()) {
+      logger_->log_debug("Setting Couchbase client CA certificate path to 
'{}'", ssl_context_service->getCACertificate().string());
+      
cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string());
+    }
+    return cluster_options;
+  }
+
+  logger_->log_debug("Using mTLS authentication for Couchbase server");
+  logger_->log_debug("Setting Couchbase client SSL key file path to '{}'", 
ssl_context_service->getPrivateKeyFile().string());
+  logger_->log_debug("Setting Couchbase client certificate file path to '{}'", 
ssl_context_service->getCertificateFile().string());
+  if (ssl_context_service->getPrivateKeyFile().empty() || 
ssl_context_service->getCertificateFile().empty()) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Couchbase client private key path or client certificate path is empty");
+  }
+
+  ::couchbase::cluster_options 
cluster_options(::couchbase::certificate_authenticator(ssl_context_service->getCertificateFile().string(),
 ssl_context_service->getPrivateKeyFile().string()));
+  if (!ssl_context_service->getCACertificate().empty()) {
+    logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", 
ssl_context_service->getCACertificate().string());
+    
cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string());
+  }
+  cluster_options.security().tls_verify(::couchbase::tls_verify_mode::peer);
+  return cluster_options;
+}
+
 nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
CouchbaseClient::getCollection(const CouchbaseCollection& collection) {
   auto connection_result = establishConnection();
   if (!connection_result) {
@@ -155,8 +199,7 @@ nonstd::expected<void, CouchbaseErrorType> 
CouchbaseClient::establishConnection(
     return {};
   }
 
-  auto options = ::couchbase::cluster_options(username_, password_);
-  auto [connect_err, cluster] = 
::couchbase::cluster::connect(connection_string_, options).get();
+  auto [connect_err, cluster] = 
::couchbase::cluster::connect(connection_string_, cluster_options_).get();
   if (connect_err.ec()) {
     logger_->log_error("Failed to connect to Couchbase cluster with error 
code: '{}' and message: '{}'", connect_err.ec(), connect_err.message());
     return nonstd::make_unexpected(getErrorType(connect_err.ec()));
@@ -179,11 +222,24 @@ void CouchbaseClusterService::onEnable() {
   getProperty(UserName, username);
   std::string password;
   getProperty(UserPassword, password);
-  if (connection_string.empty() || username.empty() || password.empty()) {
-    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Missing connection string, username or password");
+  if (connection_string.empty()) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Missing connection string");
+  }
+
+  if ((username.empty() || password.empty()) && linked_services_.empty()) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Missing username and password or SSLContextService as a linked service");
+  }
+
+  minifi::controllers::SSLContextService* ssl_context_service_ptr = nullptr;
+  if (!linked_services_.empty()) {
+    auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(linked_services_[0]);
+    if (!ssl_context_service) {
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, 
"Linked service is not an SSLContextService");
+    }
+    ssl_context_service_ptr = ssl_context_service.get();
   }
+  client_ = std::make_unique<CouchbaseClient>(connection_string, username, 
password, ssl_context_service_ptr, logger_);
 
-  client_ = std::make_unique<CouchbaseClient>(connection_string, username, 
password, logger_);
   auto result = client_->establishConnection();
   if (!result) {
     if (result.error() == CouchbaseErrorType::FATAL) {
diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
index 28c6390dd..217449723 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
@@ -31,6 +31,7 @@
 #include "couchbase/cluster.hxx"
 #include "core/ProcessContext.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "controllers/SSLContextService.h"
 
 namespace org::apache::nifi::minifi::couchbase {
 
@@ -69,9 +70,8 @@ enum class CouchbaseErrorType {
 
 class CouchbaseClient {
  public:
-  CouchbaseClient(std::string connection_string, std::string username, 
std::string password, const std::shared_ptr<core::logging::Logger>& logger)
-    : connection_string_(std::move(connection_string)), 
username_(std::move(username)), password_(std::move(password)), logger_(logger) 
{
-  }
+  CouchbaseClient(std::string connection_string, std::string username, 
std::string password, controllers::SSLContextService* ssl_context_service,
+    const std::shared_ptr<core::logging::Logger>& logger);
 
   ~CouchbaseClient() {
     close();
@@ -89,14 +89,14 @@ class CouchbaseClient {
   void close();
 
  private:
+  ::couchbase::cluster_options buildClusterOptions(std::string username, 
std::string password, minifi::controllers::SSLContextService* 
ssl_context_service);
   nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
getCollection(const CouchbaseCollection& collection);
 
   std::string connection_string_;
-  std::string username_;
-  std::string password_;
+  std::shared_ptr<core::logging::Logger> logger_;
+  ::couchbase::cluster_options cluster_options_;
   std::mutex cluster_mutex_;
   std::optional<::couchbase::cluster> cluster_;
-  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 namespace controllers {
diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp 
b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
index 6bc423b15..0d63420d0 100644
--- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp
+++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp
@@ -23,11 +23,13 @@
 namespace org::apache::nifi::minifi::core::controller {
 
 bool StandardControllerServiceNode::enable() {
-  Property property("Linked Services", "Referenced Controller Services");
-  controller_service_->setState(ENABLED);
   logger_->log_trace("Enabling CSN {}", getName());
+  if (active) {
+    logger_->log_debug("CSN {} is already enabled", getName());
+    return true;
+  }
+  Property property("Linked Services", "Referenced Controller Services");
   if (getProperty(property.getName(), property)) {
-    active = true;
     for (const auto& linked_service : property.getValues()) {
       ControllerServiceNode* csNode = 
provider->getControllerServiceNode(linked_service, 
controller_service_->getUUID());
       if (nullptr != csNode) {
@@ -40,13 +42,20 @@ bool StandardControllerServiceNode::enable() {
   if (nullptr != impl) {
     std::lock_guard<std::mutex> lock(mutex_);
     std::vector<std::shared_ptr<ControllerService>> services;
+    std::vector<ControllerServiceNode*> service_nodes;
     services.reserve(linked_controller_services_.size());
     for (const auto& service : linked_controller_services_) {
       services.push_back(service->getControllerServiceImplementation());
+      if (!service->enable()) {
+        logger_->log_debug("Linked Service '{}' could not be enabled", 
service->getName());
+        return false;
+      }
     }
     impl->setLinkedControllerServices(services);
     impl->onEnable();
   }
+  active = true;
+  controller_service_->setState(ENABLED);
   return true;
 }
 


Reply via email to