This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ea16935f9cbb30864a36bc78e2a9ce47ba039d5b
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Nov 27 12:56:07 2024 +0100

    MINIFICPP-2469 Create GetCouchbaseKey processor
    
    Closes #1881
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  40 +++++
 README.md                                          |   2 +-
 .../cluster/containers/CouchbaseServerContainer.py |   4 +-
 docker/test/integration/features/couchbase.feature | 108 +++++++++++++
 .../{PutCouchbaseKey.py => GetCouchbaseKey.py}     |   8 +-
 .../minifi/processors/PutCouchbaseKey.py           |   2 +-
 .../controllerservices/CouchbaseClusterService.cpp |  76 ++++++---
 .../controllerservices/CouchbaseClusterService.h   |  17 ++-
 .../couchbase/processors/GetCouchbaseKey.cpp       | 111 ++++++++++++++
 .../{PutCouchbaseKey.h => GetCouchbaseKey.h}       |  96 +++---------
 extensions/couchbase/processors/PutCouchbaseKey.h  |   2 +-
 .../couchbase/tests/GetCouchbaseKeyTests.cpp       | 170 +++++++++++++++++++++
 .../couchbase/tests/MockCouchbaseClusterService.h  |  35 ++++-
 libminifi/include/core/Processor.h                 |   4 +-
 14 files changed, 573 insertions(+), 102 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index eec26bbac..667130ab4 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -48,6 +48,7 @@ limitations under the License.
 - [FetchSmb](#FetchSmb)
 - [FocusArchiveEntry](#FocusArchiveEntry)
 - [GenerateFlowFile](#GenerateFlowFile)
+- [GetCouchbaseKey](#GetCouchbaseKey)
 - [GetFile](#GetFile)
 - [GetTCP](#GetTCP)
 - [HashContent](#HashContent)
@@ -1096,6 +1097,45 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | success operational on the flow record |
 
 
+## GetCouchbaseKey
+
+### Description
+
+Get a document from Couchbase Server via Key/Value access. The ID of the 
document to fetch may be supplied by setting the <Document Id> property. NOTE: 
if the Document Id property is not set, the contents of the FlowFile will be 
read to determine the Document Id, which means that the contents of the entire 
FlowFile will be buffered in memory.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                     | Default Value | Allowable Values  
         | Description                                                          
                                                                                
                                                                                
          |
+|------------------------------------------|---------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Couchbase Cluster Controller Service** |               |                   
         | A Couchbase Cluster Controller Service which manages connections to 
a Couchbase cluster.                                                            
                                                                                
           |
+| **Bucket Name**                          | default       |                   
         | The name of bucket to access.<br/>**Supports Expression Language: 
true**                                                                          
                                                                                
             |
+| Scope Name                               |               |                   
         | Scope to use inside the bucket. If not specified, the _default scope 
is used.<br/>**Supports Expression Language: true**                             
                                                                                
          |
+| Collection Name                          |               |                   
         | Collection to use inside the bucket scope. If not specified, the 
_default collection is used.<br/>**Supports Expression Language: true**         
                                                                                
              |
+| **Document Type**                        | Json          | 
Json<br/>Binary<br/>String | Content type of the retrieved value.               
                                                                                
                                                                                
                            |
+| Document Id                              |               |                   
         | A static, fixed Couchbase document id, or an expression to construct 
the Couchbase document id.<br/>**Supports Expression Language: true**           
                                                                                
          |
+| Put Value to Attribute                   |               |                   
         | If set, the retrieved value will be put into an attribute of the 
FlowFile instead of a the content of the FlowFile. The attribute key to put to 
is determined by evaluating value of this property.<br/>**Supports Expression 
Language: true** |
+
+### Relationships
+
+| Name     | Description                                                       
                                                                                
                  |
+|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| success  | Values retrieved from Couchbase Server are written as outgoing 
FlowFiles content or put into an attribute of the incoming FlowFile and routed 
to this relationship. |
+| failure  | All FlowFiles failed to fetch from Couchbase Server and not 
retry-able are routed to this relationship.                                     
                        |
+| retry    | All FlowFiles failed to fetch from Couchbase Server but can be 
retried are routed to this relationship.                                        
                     |
+| original | The original input FlowFile is routed to this relationship when 
the value is retrieved from Couchbase Server and routed to 'success'.           
                    |
+
+### Output Attributes
+
+| Attribute            | Relationship | Description                           |
+|----------------------|--------------|---------------------------------------|
+| couchbase.bucket     | success      | Bucket where the document was stored. |
+| couchbase.doc.id     | success      | Id of the document.                   |
+| couchbase.doc.cas    | success      | CAS of the document.                  |
+| couchbase.doc.expiry | success      | Expiration of the document.           |
+
+
 ## GetFile
 
 ### Description
diff --git a/README.md b/README.md
index 069d142ad..3c659f335 100644
--- a/README.md
+++ b/README.md
@@ -75,7 +75,7 @@ The next table outlines CMAKE flags that correspond with 
MiNiFi extensions. Exte
 | AWS                              | 
[AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)
                                                                                
                                                                                
                                                       [...]
 | Azure                            | 
[AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobstorage)<br/>[DeleteAzureBlobStorage](PROCESSORS.md#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](PROCESSORS.md#fetchazureblobstorage)<br/>[ListAzureBlobStorage](PROCESSORS.md#listazureblobstorage)<br/>[PutAzureDataLakeStorage](PROCESSORS.md#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](PROCESSORS.md#del
 [...]
 | CivetWeb                         | [ListenHTTP](PROCESSORS.md#listenhttp)    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| Couchbase                        | 
[CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)<br/>[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey)
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| Couchbase                        | 
[CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)<br/>[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey)<br/>[GetCouchbaseKey](PROCESSORS.md#getcouchbasekey)
                                                                                
                                                                                
                                                                                
                                             [...]
 | Elasticsearch                    | 
[ElasticsearchCredentialsControllerService](CONTROLLERS.md#elasticsearchcredentialscontrollerservice)<br/>[PostElasticsearch](PROCESSORS.md#postelasticsearch)
                                                                                
                                                                                
                                                                                
                                                          [...]
 | ExecuteProcess (Linux and macOS) | 
[ExecuteProcess](PROCESSORS.md#executeprocess)                                  
                                                                                
                                                                                
                                                                                
                                                                                
                                                        [...]
 | Google Cloud Platform            | 
[DeleteGCSObject](PROCESSORS.md#deletegcsobject)<br>[FetchGCSObject](PROCESSORS.md#fetchgcsobject)<br>[GCPCredentialsControllerService](CONTROLLERS.md#gcpcredentialscontrollerservice)<br>[ListGCSBucket](PROCESSORS.md#listgcsbucket)<br>[PutGCSObject](PROCESSORS.md#putgcsobject)
                                                                                
                                                                                
                   [...]
diff --git 
a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py 
b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
index 1c4353046..9baf01f08 100644
--- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
+++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
@@ -33,7 +33,7 @@ class CouchbaseServerContainer(Container):
             ["couchbase-cli", "cluster-init", "-c", "localhost", 
"--cluster-username", "Administrator", "--cluster-password", "password123", 
"--services", "data,index,query",
              "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
             ["couchbase-cli", "bucket-create", "-c", "localhost", 
"--username", "Administrator", "--password", "password123", "--bucket", 
"test_bucket", "--bucket-type", "couchbase",
-             "--bucket-ramsize", "1024"]
+             "--bucket-ramsize", "1024", "--max-ttl", "36000"]
         ]
         for command in commands:
             (code, _) = self.client.containers.get(self.name).exec_run(command)
@@ -47,7 +47,7 @@ class CouchbaseServerContainer(Container):
             return
 
         self.docker_container = self.client.containers.run(
-            "couchbase:community-7.6.2",
+            "couchbase:enterprise-7.2.5",
             detach=True,
             name=self.name,
             network=self.network.name,
diff --git a/docker/test/integration/features/couchbase.feature 
b/docker/test/integration/features/couchbase.feature
index 446821f25..646eaacd9 100644
--- a/docker/test/integration/features/couchbase.feature
+++ b/docker/test/integration/features/couchbase.feature
@@ -65,3 +65,111 @@ Feature: Executing Couchbase operations from MiNiFi-C++
     And the Minifi logs match the following regex: 
"key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
     And the Minifi logs match the following regex: "key:couchbase.partition.id 
value:[1-9][0-9]*" in less than 1 seconds
     And a document with id "test_doc_id" in bucket "test_bucket" is present 
with data '{"field1": "value1"}' of type "Binary" in Couchbase
+
+  Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+
+    And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
+    And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+    And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+
+    When a Couchbase server is started
+    And all instances start up
+
+    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 60 seconds
+    And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
+    And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.expiry 
value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
+
+  Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor using binary storage
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Document Type" property of the PutCouchbaseKey processor is set 
to "Binary"
+    And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Document Type" property of the GetCouchbaseKey processor is set 
to "Binary"
+    And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+
+    And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
+    And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+    And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+
+    When a Couchbase server is started
+    And all instances start up
+
+    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 60 seconds
+    And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
+    And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.expiry 
value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
+
+  Scenario: A MiNiFi instance can get data from test bucket with 
GetCouchbaseKey processor and put the result in an attribute
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Document Type" property of the PutCouchbaseKey processor is set 
to "String"
+    And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Document Type" property of the GetCouchbaseKey processor is set 
to "String"
+    And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
+    And the "Put Value to Attribute" property of the GetCouchbaseKey processor 
is set to "get_couchbase_result"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And a LogAttribute processor with the "FlowFiles To Log" property set to 
"0"
+    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+
+    And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
+    And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+    And the "success" relationship of the GetCouchbaseKey processor is 
connected to the PutFile
+    And the "success" relationship of the PutFile processor is connected to 
the LogAttribute
+
+    When a Couchbase server is started
+    And all instances start up
+
+    Then a flowfile with the JSON content '{"field1": "value1", "field2": 
"value2"}' is placed in the monitored directory in less than 60 seconds
+    And the Minifi logs contain the following message: "key:couchbase.bucket 
value:test_bucket" in less than 10 seconds
+    And the Minifi logs contain the following message: "key:couchbase.doc.id 
value:test_doc_id" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.cas 
value:[1-9][0-9]*" in less than 1 seconds
+    And the Minifi logs match the following regex: "key:couchbase.doc.expiry 
value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
+    And the Minifi logs contain the following message: 
'key:get_couchbase_result value:{"field1": "value1", "field2": "value2"}' in 
less than 1 seconds
+
+  Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on 
Couchbase value type mismatch
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content '{"field1": "value1", "field2": "value2"}' is 
present in '/tmp/input'
+    And a PutCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the PutCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Document Type" property of the PutCouchbaseKey processor is set 
to "String"
+    And the "Couchbase Cluster Controller Service" property of the 
PutCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a GetCouchbaseKey processor with the "Bucket Name" property set to 
"test_bucket"
+    And the "Document Id" property of the GetCouchbaseKey processor is set to 
"test_doc_id"
+    And the "Document Type" property of the GetCouchbaseKey processor is set 
to "Binary"
+    And the "Couchbase Cluster Controller Service" property of the 
GetCouchbaseKey processor is set to "CouchbaseClusterService"
+    And a CouchbaseClusterService is setup up with the name 
"CouchbaseClusterService"
+
+    And the "success" relationship of the GetFile processor is connected to 
the PutCouchbaseKey
+    And the "success" relationship of the PutCouchbaseKey processor is 
connected to the GetCouchbaseKey
+
+    When a Couchbase server is started
+    And all instances start up
+
+    Then the Minifi logs contain the following message: "Failed to get content 
for document 'test_doc_id' from collection 'test_bucket._default._default' with 
the following exception: 'raw_binary_transcoder expects document to have BINARY 
common flags" in less than 60 seconds
diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py 
b/docker/test/integration/minifi/processors/GetCouchbaseKey.py
similarity index 84%
copy from docker/test/integration/minifi/processors/PutCouchbaseKey.py
copy to docker/test/integration/minifi/processors/GetCouchbaseKey.py
index 5e94aaa07..0b48dcbdd 100644
--- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py
+++ b/docker/test/integration/minifi/processors/GetCouchbaseKey.py
@@ -15,10 +15,10 @@
 from ..core.Processor import Processor
 
 
-class PutCouchbaseKey(Processor):
+class GetCouchbaseKey(Processor):
     def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(PutCouchbaseKey, self).__init__(
+        super(GetCouchbaseKey, self).__init__(
             context=context,
-            clazz='PutCouchbaseKey',
-            auto_terminate=['success', 'failure'],
+            clazz='GetCouchbaseKey',
+            auto_terminate=['success', 'failure', 'retry'],
             schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py 
b/docker/test/integration/minifi/processors/PutCouchbaseKey.py
index 5e94aaa07..341338771 100644
--- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py
+++ b/docker/test/integration/minifi/processors/PutCouchbaseKey.py
@@ -20,5 +20,5 @@ class PutCouchbaseKey(Processor):
         super(PutCouchbaseKey, self).__init__(
             context=context,
             clazz='PutCouchbaseKey',
-            auto_terminate=['success', 'failure'],
+            auto_terminate=['success', 'failure', 'retry'],
             schedule=schedule)
diff --git 
a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
index 5207657f2..c40697cf7 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
@@ -17,11 +17,12 @@
  */
 
 #include "CouchbaseClusterService.h"
-#include "couchbase/codec/raw_binary_transcoder.hxx"
-#include "couchbase/codec/raw_string_transcoder.hxx"
-#include "couchbase/codec/raw_json_transcoder.hxx"
 
 #include "core/Resource.h"
+#include "couchbase/codec/raw_binary_transcoder.hxx"
+#include "couchbase/codec/raw_json_transcoder.hxx"
+#include "couchbase/codec/raw_string_transcoder.hxx"
+#include "utils/TimeUtil.h"
 
 namespace org::apache::nifi::minifi::couchbase {
 
@@ -58,8 +59,8 @@ nonstd::expected<::couchbase::collection, CouchbaseErrorType> 
CouchbaseClient::g
   return 
cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
 }
 
-nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> 
CouchbaseClient::upsert(const CouchbaseCollection& collection,
-    CouchbaseValueType document_type, const std::string& document_id, const 
std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) {
+nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> 
CouchbaseClient::upsert(
+    const CouchbaseCollection& collection, CouchbaseValueType document_type, 
const std::string& document_id, const std::vector<std::byte>& buffer, const 
::couchbase::upsert_options& options) {
   auto collection_result = getCollection(collection);
   if (!collection_result.has_value()) {
     return nonstd::make_unexpected(collection_result.error());
@@ -78,27 +79,68 @@ nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> 
CouchbaseClient::ups
   if (upsert_err.ec()) {
     // ambiguous_timeout should not be retried as we do not know if the insert 
was successful or not
     if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && 
upsert_err.ec().value() != 
static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) {
-      logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
-        document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+      logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'", 
document_id, collection.bucket_name, collection.scope_name,
+          collection.collection_name, upsert_err.ec(), upsert_err.message());
       return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
     }
-    logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' with error code: '{}', message: '{}'",
-      document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name, upsert_err.ec(), upsert_err.message());
+    logger_->log_error("Failed to upsert document '{}' to collection 
'{}.{}.{}' with error code: '{}', message: '{}'", document_id, 
collection.bucket_name, collection.scope_name,
+        collection.collection_name, upsert_err.ec(), upsert_err.message());
     return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
   } else {
-    const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->partition_uuid() : 0);
-    const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() 
? upsert_resp.mutation_token()->sequence_number() : 0);
-    const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->partition_id() : 0);
     return CouchbaseUpsertResult {
-      collection.bucket_name,
-      upsert_resp.cas().value(),
-      partition_uuid,
-      sequence_number,
-      partition_id
+      {
+        collection.bucket_name,
+        upsert_resp.cas().value(),
+      },
+      (upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->sequence_number() : 0),
+      (upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->partition_uuid() : 0),
+      gsl::narrow<uint16_t>(upsert_resp.mutation_token().has_value() ? 
upsert_resp.mutation_token()->partition_id() : 0)
     };
   }
 }
 
+nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> 
CouchbaseClient::get(const CouchbaseCollection& collection, const std::string& 
document_id, CouchbaseValueType return_type) {
+  auto collection_result = getCollection(collection);
+  if (!collection_result.has_value()) {
+    return nonstd::make_unexpected(collection_result.error());
+  }
+
+  ::couchbase::get_options options;
+  options.with_expiry(true);
+  auto [get_err, resp] = collection_result->get(document_id, options).get();
+  if (get_err.ec()) {
+    if (getErrorType(get_err.ec()) == CouchbaseErrorType::TEMPORARY) {
+      logger_->log_error("Failed to get document '{}' from collection 
'{}.{}.{}' due to timeout", document_id, collection.bucket_name, 
collection.scope_name, collection.collection_name);
+      return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
+    }
+    std::string cause = get_err.cause() ? get_err.cause()->message() : "";
+    logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' 
with error code: '{}', message: '{}'", document_id, collection.bucket_name, 
collection.scope_name,
+        collection.collection_name, get_err.ec(), get_err.message());
+    return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
+  } else {
+    try {
+      CouchbaseGetResult result;
+      result.bucket_name = collection.bucket_name;
+      result.cas = resp.cas().value();
+      if (return_type == CouchbaseValueType::Json) {
+        result.value = resp.content_as<::couchbase::codec::binary, 
::couchbase::codec::raw_json_transcoder>();
+      } else if (return_type == CouchbaseValueType::String) {
+        result.value = 
resp.content_as<::couchbase::codec::raw_string_transcoder>();
+      } else {
+        result.value = 
resp.content_as<::couchbase::codec::raw_binary_transcoder>();
+      }
+      if (resp.expiry_time().has_value()) {
+        result.expiry = utils::timeutils::getTimeStr(*resp.expiry_time());
+      }
+      return result;
+    } catch (const std::exception& ex) {
+      logger_->log_error("Failed to get content for document '{}' from 
collection '{}.{}.{}' with the following exception: '{}'", document_id, 
collection.bucket_name, collection.scope_name,
+          collection.collection_name, ex.what());
+      return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
+    }
+  }
+}
+
 void CouchbaseClient::close() {
   std::lock_guard<std::mutex> lock(cluster_mutex_);
   if (cluster_) {
diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h 
b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
index b848b741d..28c6390dd 100644
--- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
@@ -22,6 +22,7 @@
 #include <string>
 #include <utility>
 #include <mutex>
+#include <variant>
 
 #include "core/controller/ControllerService.h"
 #include "core/PropertyDefinition.h"
@@ -39,9 +40,17 @@ struct CouchbaseCollection {
   std::string collection_name;
 };
 
-struct CouchbaseUpsertResult {
+struct CouchbaseCallResult {
   std::string bucket_name;
   std::uint64_t cas{0};
+};
+
+struct CouchbaseGetResult : public CouchbaseCallResult {
+  std::string expiry;
+  std::variant<std::vector<std::byte>, std::string> value;
+};
+
+struct CouchbaseUpsertResult : public CouchbaseCallResult {
   std::uint64_t sequence_number{0};
   std::uint64_t partition_uuid{0};
   std::uint16_t partition_id{0};
@@ -75,6 +84,7 @@ class CouchbaseClient {
 
   nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const 
CouchbaseCollection& collection, CouchbaseValueType document_type, const 
std::string& document_id,
     const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& 
options);
+  nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> get(const 
CouchbaseCollection& collection, const std::string& document_id, 
CouchbaseValueType return_type);
   nonstd::expected<void, CouchbaseErrorType> establishConnection();
   void close();
 
@@ -151,6 +161,11 @@ class CouchbaseClusterService : public 
core::controller::ControllerService {
     return client_->upsert(collection, document_type, document_id, buffer, 
options);
   }
 
+  virtual nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> get(const 
CouchbaseCollection& collection, const std::string& document_id, 
CouchbaseValueType return_type) {
+    gsl_Expects(client_);
+    return client_->get(collection, document_id, return_type);
+  }
+
   static gsl::not_null<std::shared_ptr<CouchbaseClusterService>> 
getFromProperty(const core::ProcessContext& context, const 
core::PropertyReference& property);
 
  private:
diff --git a/extensions/couchbase/processors/GetCouchbaseKey.cpp 
b/extensions/couchbase/processors/GetCouchbaseKey.cpp
new file mode 100644
index 000000000..89cf160f6
--- /dev/null
+++ b/extensions/couchbase/processors/GetCouchbaseKey.cpp
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GetCouchbaseKey.h"
+#include "utils/gsl.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::couchbase::processors {
+
+void GetCouchbaseKey::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  couchbase_cluster_service_ = 
controllers::CouchbaseClusterService::getFromProperty(context, 
GetCouchbaseKey::CouchbaseClusterControllerService);
+  document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, 
GetCouchbaseKey::DocumentType);
+}
+
+void GetCouchbaseKey::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  gsl_Expects(couchbase_cluster_service_);
+
+  auto flow_file = session.get();
+  if (!flow_file) {
+    context.yield();
+    return;
+  }
+
+  CouchbaseCollection collection;
+  if (!context.getProperty(BucketName, collection.bucket_name, 
flow_file.get()) || collection.bucket_name.empty()) {
+    logger_->log_error("Bucket '{}' is invalid or empty!", 
collection.bucket_name);
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (!context.getProperty(ScopeName, collection.scope_name, flow_file.get()) 
|| collection.scope_name.empty()) {
+    collection.scope_name = ::couchbase::scope::default_name;
+  }
+
+  if (!context.getProperty(CollectionName, collection.collection_name, 
flow_file.get()) || collection.collection_name.empty()) {
+    collection.collection_name = ::couchbase::collection::default_name;
+  }
+
+  std::string document_id;
+  if (!context.getProperty(DocumentId, document_id, flow_file.get()) || 
document_id.empty()) {
+    auto ff_content = session.readBuffer(flow_file).buffer;
+    document_id = std::string(reinterpret_cast<const 
char*>(ff_content.data()), ff_content.size());
+  }
+
+  if (document_id.empty()) {
+    logger_->log_error("Document ID is empty, transferring FlowFile to failure 
relationship");
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  std::string attribute_to_put_result_to;
+  context.getProperty(PutValueToAttribute, attribute_to_put_result_to, 
flow_file.get());
+
+  if (auto get_result = couchbase_cluster_service_->get(collection, 
document_id, document_type_)) {
+    if (!attribute_to_put_result_to.empty()) {
+      if (document_type_ == CouchbaseValueType::String) {
+        session.putAttribute(*flow_file, attribute_to_put_result_to, 
std::get<std::string>(get_result->value));
+      } else {
+        auto& binary_data = 
std::get<std::vector<std::byte>>(get_result->value);
+        std::string str_value{reinterpret_cast<const 
char*>(binary_data.data()), binary_data.size()};
+        session.putAttribute(*flow_file, attribute_to_put_result_to, 
str_value);
+      }
+    } else {
+      session.write(flow_file, [&, this](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
+        if (document_type_ == CouchbaseValueType::String) {
+          auto& value = std::get<std::string>(get_result->value);
+          stream->write(value);
+          return gsl::narrow<int64_t>(value.size());
+        } else {
+          auto& value = std::get<std::vector<std::byte>>(get_result->value);
+          stream->write(value);
+          return gsl::narrow<int64_t>(value.size());
+        }
+      });
+    }
+
+    session.putAttribute(*flow_file, "couchbase.bucket", 
get_result->bucket_name);
+    session.putAttribute(*flow_file, "couchbase.doc.id", document_id);
+    session.putAttribute(*flow_file, "couchbase.doc.cas", 
std::to_string(get_result->cas));
+    session.putAttribute(*flow_file, "couchbase.doc.expiry", 
get_result->expiry);
+    session.transfer(flow_file, Success);
+  } else if (get_result.error() == CouchbaseErrorType::TEMPORARY) {
+    logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' 
due to timeout, transferring to retry relationship",
+      document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name);
+    session.transfer(flow_file, Retry);
+  } else {
+    logger_->log_error("Failed to get document '{}' from collection 
'{}.{}.{}', transferring to failure relationship",
+      document_id, collection.bucket_name, collection.scope_name, 
collection.collection_name);
+    session.transfer(flow_file, Failure);
+  }
+}
+
+REGISTER_RESOURCE(GetCouchbaseKey, Processor);
+
+}  // namespace org::apache::nifi::minifi::couchbase::processors
diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h 
b/extensions/couchbase/processors/GetCouchbaseKey.h
similarity index 56%
copy from extensions/couchbase/processors/PutCouchbaseKey.h
copy to extensions/couchbase/processors/GetCouchbaseKey.h
index 2b4f420af..4951c18ef 100644
--- a/extensions/couchbase/processors/PutCouchbaseKey.h
+++ b/extensions/couchbase/processors/GetCouchbaseKey.h
@@ -23,56 +23,18 @@
 
 #include "core/AbstractProcessor.h"
 #include "core/ProcessSession.h"
-#include "utils/Enum.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "CouchbaseClusterService.h"
-#include "couchbase/persist_to.hxx"
-#include "couchbase/replicate_to.hxx"
-
-namespace magic_enum::customize {
-
-template <>
-constexpr customize_t 
enum_name<::couchbase::persist_to>(::couchbase::persist_to value) noexcept {
-  switch (value) {
-    case ::couchbase::persist_to::none:
-      return "NONE";
-    case ::couchbase::persist_to::active:
-      return "ACTIVE";
-    case ::couchbase::persist_to::one:
-      return "ONE";
-    case ::couchbase::persist_to::two:
-      return "TWO";
-    case ::couchbase::persist_to::three:
-      return "THREE";
-    case ::couchbase::persist_to::four:
-      return "FOUR";
-  }
-  return invalid_tag;
-}
-
-template <>
-constexpr customize_t 
enum_name<::couchbase::replicate_to>(::couchbase::replicate_to value) noexcept {
-  switch (value) {
-    case ::couchbase::replicate_to::none:
-      return "NONE";
-    case ::couchbase::replicate_to::one:
-      return "ONE";
-    case ::couchbase::replicate_to::two:
-      return "TWO";
-    case ::couchbase::replicate_to::three:
-      return "THREE";
-  }
-  return invalid_tag;
-}
-}  // namespace magic_enum::customize
 
 namespace org::apache::nifi::minifi::couchbase::processors {
 
-class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> {
+class GetCouchbaseKey final : public core::AbstractProcessor<GetCouchbaseKey> {
  public:
-  using core::AbstractProcessor<PutCouchbaseKey>::AbstractProcessor;
+  using core::AbstractProcessor<GetCouchbaseKey>::AbstractProcessor;
 
-  EXTENSIONAPI static constexpr const char* Description = "Put a document to 
Couchbase Server via Key/Value access.";
+  EXTENSIONAPI static constexpr const char* Description = "Get a document from 
Couchbase Server via Key/Value access. The ID of the document to fetch may be 
supplied by setting the "
+    "<Document Id> property. NOTE: if the Document Id property is not set, the 
contents of the FlowFile will be read to determine the Document Id, which means 
that the contents of "
+    "the entire FlowFile will be buffered in memory.";
 
   EXTENSIONAPI static constexpr auto CouchbaseClusterControllerService = 
core::PropertyDefinitionBuilder<>::createProperty("Couchbase Cluster Controller 
Service")
       .withDescription("A Couchbase Cluster Controller Service which manages 
connections to a Couchbase cluster.")
@@ -93,28 +55,20 @@ class PutCouchbaseKey final : public 
core::AbstractProcessor<PutCouchbaseKey> {
       .withDescription("Collection to use inside the bucket scope. If not 
specified, the _default collection is used.")
       .supportsExpressionLanguage(true)
       .build();
-  EXTENSIONAPI static constexpr auto DocumentType = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<CouchbaseValueType>()>::createProperty("Document
 Type")
-      .withDescription("Content type to store data as.")
+  EXTENSIONAPI static constexpr auto DocumentType = 
core::PropertyDefinitionBuilder<3>::createProperty("Document Type")
+      .withDescription("Content type of the retrieved value.")
       .isRequired(true)
       .withDefaultValue(magic_enum::enum_name(CouchbaseValueType::Json))
       .withAllowedValues(magic_enum::enum_names<CouchbaseValueType>())
       .build();
   EXTENSIONAPI static constexpr auto DocumentId = 
core::PropertyDefinitionBuilder<>::createProperty("Document Id")
-      .withDescription("A static, fixed Couchbase document id, or an 
expression to construct the Couchbase document id. "
-                       "If not specified, either the FlowFile uuid attribute 
or if that's not found a generated uuid will be used.")
+      .withDescription("A static, fixed Couchbase document id, or an 
expression to construct the Couchbase document id.")
       .supportsExpressionLanguage(true)
       .build();
-  EXTENSIONAPI static constexpr auto PersistTo = 
core::PropertyDefinitionBuilder<6>::createProperty("Persist To")
-      .withDescription("Durability constraint about disk persistence.")
-      .isRequired(true)
-      .withDefaultValue(magic_enum::enum_name(::couchbase::persist_to::none))
-      .withAllowedValues(magic_enum::enum_names<::couchbase::persist_to>())
-      .build();
-  EXTENSIONAPI static constexpr auto ReplicateTo = 
core::PropertyDefinitionBuilder<4>::createProperty("Replicate To")
-      .withDescription("Durability constraint about replication.")
-      .isRequired(true)
-      .withDefaultValue(magic_enum::enum_name(::couchbase::replicate_to::none))
-      .withAllowedValues(magic_enum::enum_names<::couchbase::replicate_to>())
+  EXTENSIONAPI static constexpr auto PutValueToAttribute = 
core::PropertyDefinitionBuilder<>::createProperty("Put Value to Attribute")
+      .withDescription("If set, the retrieved value will be put into an 
attribute of the FlowFile instead of a the content of the FlowFile. "
+                       "The attribute key to put to is determined by 
evaluating value of this property.")
+      .supportsExpressionLanguage(true)
       .build();
 
   EXTENSIONAPI static constexpr auto Properties = 
std::to_array<core::PropertyReference>({
@@ -124,23 +78,23 @@ class PutCouchbaseKey final : public 
core::AbstractProcessor<PutCouchbaseKey> {
     CollectionName,
     DocumentType,
     DocumentId,
-    PersistTo,
-    ReplicateTo
+    PutValueToAttribute
   });
 
-  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "All FlowFiles that are written to 
Couchbase Server are routed to this relationship."};
-  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "All FlowFiles failed to be written to 
Couchbase Server and not retry-able are routed to this relationship."};
-  EXTENSIONAPI static constexpr auto Retry = 
core::RelationshipDefinition{"retry", "All FlowFiles failed to be written to 
Couchbase Server but can be retried are routed to this relationship."};
-  EXTENSIONAPI static constexpr auto Relationships = std::array{Success, 
Failure, Retry};
+  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success",
+      "Values retrieved from Couchbase Server are written as outgoing 
FlowFiles content or put into an attribute of the incoming FlowFile and routed 
to this relationship."};
+  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "All FlowFiles failed to fetch from 
Couchbase Server and not retry-able are routed to this relationship."};
+  EXTENSIONAPI static constexpr auto Retry = 
core::RelationshipDefinition{"retry", "All FlowFiles failed to fetch from 
Couchbase Server but can be retried are routed to this relationship."};
+  EXTENSIONAPI static constexpr auto Original = 
core::RelationshipDefinition{"original",
+      "The original input FlowFile is routed to this relationship when the 
value is retrieved from Couchbase Server and routed to 'success'."};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Success, 
Failure, Retry, Original};
 
   EXTENSIONAPI static constexpr auto CouchbaseBucket = 
core::OutputAttributeDefinition<>{"couchbase.bucket", {Success}, "Bucket where 
the document was stored."};
   EXTENSIONAPI static constexpr auto CouchbaseDocId = 
core::OutputAttributeDefinition<>{"couchbase.doc.id", {Success}, "Id of the 
document."};
   EXTENSIONAPI static constexpr auto CouchbaseDocCas = 
core::OutputAttributeDefinition<>{"couchbase.doc.cas", {Success}, "CAS of the 
document."};
-  EXTENSIONAPI static constexpr auto CouchbaseDocSequenceNumber = 
core::OutputAttributeDefinition<>{"couchbase.doc.sequence.number", {Success}, 
"Sequence number associated with the document."};
-  EXTENSIONAPI static constexpr auto CouchbasePartitionUUID = 
core::OutputAttributeDefinition<>{"couchbase.partition.uuid", {Success}, "UUID 
of partition."};
-  EXTENSIONAPI static constexpr auto CouchbasePartitionId = 
core::OutputAttributeDefinition<>{"couchbase.partition.id", {Success}, "ID of 
partition (also known as vBucket)."};
-  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::array<core::OutputAttributeReference, 6>{
-      CouchbaseBucket, CouchbaseDocId, CouchbaseDocCas, 
CouchbaseDocSequenceNumber, CouchbasePartitionUUID, CouchbasePartitionId};
+  EXTENSIONAPI static constexpr auto CouchbaseDocExpiry = 
core::OutputAttributeDefinition<>{"couchbase.doc.expiry", {Success}, 
"Expiration of the document."};
+  EXTENSIONAPI static constexpr auto OutputAttributes = 
std::array<core::OutputAttributeReference, 4>{
+      CouchbaseBucket, CouchbaseDocId, CouchbaseDocCas, CouchbaseDocExpiry};
 
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
@@ -152,10 +106,8 @@ class PutCouchbaseKey final : public 
core::AbstractProcessor<PutCouchbaseKey> {
 
  private:
   std::shared_ptr<controllers::CouchbaseClusterService> 
couchbase_cluster_service_;
-  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_);
   CouchbaseValueType document_type_ = CouchbaseValueType::Json;
-  ::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none;
-  ::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none;
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<GetCouchbaseKey>::getLogger(uuid_);
 };
 
 }  // namespace org::apache::nifi::minifi::couchbase::processors
diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h 
b/extensions/couchbase/processors/PutCouchbaseKey.h
index 2b4f420af..1991488ed 100644
--- a/extensions/couchbase/processors/PutCouchbaseKey.h
+++ b/extensions/couchbase/processors/PutCouchbaseKey.h
@@ -152,10 +152,10 @@ class PutCouchbaseKey final : public 
core::AbstractProcessor<PutCouchbaseKey> {
 
  private:
   std::shared_ptr<controllers::CouchbaseClusterService> 
couchbase_cluster_service_;
-  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_);
   CouchbaseValueType document_type_ = CouchbaseValueType::Json;
   ::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none;
   ::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none;
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_);
 };
 
 }  // namespace org::apache::nifi::minifi::couchbase::processors
diff --git a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp 
b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
new file mode 100644
index 000000000..0d2e5b910
--- /dev/null
+++ b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
@@ -0,0 +1,170 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "processors/GetCouchbaseKey.h"
+#include "MockCouchbaseClusterService.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::couchbase::test {
+
+REGISTER_RESOURCE(MockCouchbaseClusterService, ControllerService);
+
+struct ExpectedCallOptions {
+  std::string bucket_name;
+  std::string scope_name;
+  std::string collection_name;
+  std::string doc_id;
+  couchbase::CouchbaseValueType document_type;
+};
+
+class GetCouchbaseKeyTestController : public TestController {
+ public:
+  GetCouchbaseKeyTestController()
+      : 
controller_(std::make_unique<processors::GetCouchbaseKey>("GetCouchbaseKey")),
+        proc_(controller_.getProcessor()) {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    
LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>();
+    LogTestController::getInstance().setDebug<processors::GetCouchbaseKey>();
+    auto controller_service_node = 
controller_.plan->addController("MockCouchbaseClusterService", 
"MockCouchbaseClusterService");
+    mock_couchbase_cluster_service_ = 
std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
+    
proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService,
 "MockCouchbaseClusterService");
+  }
+
+  void verifyResults(const minifi::test::ProcessorTriggerResult& results, 
const minifi::core::Relationship& expected_result, const ExpectedCallOptions& 
expected_call_options,
+      const std::string& input) const {
+    std::shared_ptr<core::FlowFile> flow_file;
+    if (expected_result == processors::GetCouchbaseKey::Success) {
+      REQUIRE(results.at(processors::GetCouchbaseKey::Success).size() == 1);
+      REQUIRE(results.at(processors::GetCouchbaseKey::Failure).empty());
+      REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty());
+      flow_file = results.at(processors::GetCouchbaseKey::Success)[0];
+    } else if (expected_result == processors::GetCouchbaseKey::Failure) {
+      REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty());
+      REQUIRE(results.at(processors::GetCouchbaseKey::Failure).size() == 1);
+      REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty());
+      flow_file = results.at(processors::GetCouchbaseKey::Failure)[0];
+      REQUIRE(LogTestController::getInstance().contains("Failed to get 
document", 1s));
+    } else {
+      REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty());
+      REQUIRE(results.at(processors::GetCouchbaseKey::Failure).empty());
+      REQUIRE(results.at(processors::GetCouchbaseKey::Retry).size() == 1);
+      flow_file = results.at(processors::GetCouchbaseKey::Retry)[0];
+    }
+
+    auto get_collection_parameters = 
mock_couchbase_cluster_service_->getCollectionParameter();
+    CHECK(get_collection_parameters.bucket_name == 
expected_call_options.bucket_name);
+    CHECK(get_collection_parameters.collection_name == 
expected_call_options.collection_name);
+    CHECK(get_collection_parameters.scope_name == 
expected_call_options.scope_name);
+
+    auto get_parameters = mock_couchbase_cluster_service_->getGetParameters();
+    CHECK(get_parameters.document_id == expected_call_options.doc_id);
+    CHECK(get_parameters.document_type == expected_call_options.document_type);
+
+    if (expected_result != processors::GetCouchbaseKey::Success) {
+      return;
+    }
+
+    CHECK(flow_file->getAttribute("couchbase.bucket").value() == 
expected_call_options.bucket_name);
+    CHECK(flow_file->getAttribute("couchbase.doc.id").value() == 
expected_call_options.doc_id);
+    CHECK(flow_file->getAttribute("couchbase.doc.cas").value() == 
std::to_string(COUCHBASE_GET_RESULT_CAS));
+    CHECK(flow_file->getAttribute("couchbase.doc.expiry").value() == 
COUCHBASE_GET_RESULT_EXPIRY);
+    std::string value;
+    proc_->getProperty(processors::GetCouchbaseKey::PutValueToAttribute, 
value);
+    if (!value.empty()) {
+      CHECK(flow_file->getAttribute(value).value() == 
COUCHBASE_GET_RESULT_CONTENT);
+      CHECK(controller_.plan->getContent(flow_file) == input);
+    } else {
+      CHECK(controller_.plan->getContent(flow_file) == 
COUCHBASE_GET_RESULT_CONTENT);
+    }
+  }
+
+ protected:
+  minifi::test::SingleProcessorTestController controller_;
+  core::Processor* proc_ = nullptr;
+  std::shared_ptr<MockCouchbaseClusterService> mock_couchbase_cluster_service_;
+};
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Invalid Couchbase cluster 
controller service", "[getcouchbasekey]") {
+  
proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService,
 "invalid");
+  
REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}}),
 minifi::Exception);
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Invalid bucket name", 
"[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "");
+  auto results = 
controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}});
+  REQUIRE(results[processors::GetCouchbaseKey::Failure].size() == 1);
+  REQUIRE(LogTestController::getInstance().contains("Bucket '' is invalid or 
empty!", 1s));
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Document ID is empty and no 
content is present to use", "[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket");
+  auto results = controller_.trigger({minifi::test::InputFlowFileData{""}});
+  REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty());
+  REQUIRE(results.at(processors::GetCouchbaseKey::Failure).size() == 1);
+  REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty());
+  REQUIRE(LogTestController::getInstance().contains("Document ID is empty, 
transferring FlowFile to failure relationship", 1s));
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get succeeeds with default 
properties", "[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket");
+  const std::string input = "couchbase_id";
+  auto results = controller_.trigger({minifi::test::InputFlowFileData{input}});
+  verifyResults(results, processors::GetCouchbaseKey::Success, 
ExpectedCallOptions{"mybucket", "_default", "_default", input, 
couchbase::CouchbaseValueType::Json}, input);
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get succeeeds with optional 
properties", "[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket");
+  proc_->setProperty(processors::GetCouchbaseKey::ScopeName, "scope1");
+  proc_->setProperty(processors::GetCouchbaseKey::CollectionName, 
"collection1");
+  proc_->setProperty(processors::GetCouchbaseKey::DocumentId, "important_doc");
+  proc_->setProperty(processors::GetCouchbaseKey::DocumentType, "Binary");
+  auto results = controller_.trigger({minifi::test::InputFlowFileData{""}});
+  verifyResults(results, processors::GetCouchbaseKey::Success, 
ExpectedCallOptions{"mybucket", "scope1", "collection1", "important_doc", 
couchbase::CouchbaseValueType::Binary}, "");
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get fails with default 
properties", "[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket");
+  mock_couchbase_cluster_service_->setGetError(CouchbaseErrorType::FATAL);
+  const std::string input = "couchbase_id";
+  auto results = controller_.trigger({minifi::test::InputFlowFileData{input}});
+  verifyResults(results, processors::GetCouchbaseKey::Failure, 
ExpectedCallOptions{"mybucket", "_default", "_default", input, 
couchbase::CouchbaseValueType::Json}, input);
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "FlowFile is transferred to 
retry relationship when temporary error is returned", "[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket");
+  mock_couchbase_cluster_service_->setGetError(CouchbaseErrorType::TEMPORARY);
+  const std::string input = "couchbase_id";
+  auto results = controller_.trigger({minifi::test::InputFlowFileData{input}});
+  verifyResults(results, processors::GetCouchbaseKey::Retry, 
ExpectedCallOptions{"mybucket", "_default", "_default", input, 
couchbase::CouchbaseValueType::Json}, input);
+}
+
+TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get result is written to 
attribute", "[getcouchbasekey]") {
+  proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket");
+  proc_->setProperty(processors::GetCouchbaseKey::DocumentType, "String");
+  proc_->setProperty(processors::GetCouchbaseKey::PutValueToAttribute, 
"myattribute");
+  const std::string input = "couchbase_id";
+  auto results = controller_.trigger({minifi::test::InputFlowFileData{input}});
+  verifyResults(results, processors::GetCouchbaseKey::Success, 
ExpectedCallOptions{"mybucket", "_default", "_default", input, 
couchbase::CouchbaseValueType::String}, input);
+}
+
+}  // namespace org::apache::nifi::minifi::couchbase::test
diff --git a/extensions/couchbase/tests/MockCouchbaseClusterService.h 
b/extensions/couchbase/tests/MockCouchbaseClusterService.h
index a2757f16c..6566e85d4 100644
--- a/extensions/couchbase/tests/MockCouchbaseClusterService.h
+++ b/extensions/couchbase/tests/MockCouchbaseClusterService.h
@@ -29,6 +29,9 @@ const std::uint64_t COUCHBASE_PUT_RESULT_CAS = 9876;
 const std::uint64_t COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER = 345;
 const std::uint64_t COUCHBASE_PUT_RESULT_PARTITION_UUID = 7890123456;
 const std::uint16_t COUCHBASE_PUT_RESULT_PARTITION_ID = 1234;
+const std::string COUCHBASE_GET_RESULT_EXPIRY = "2024/10/14 09:37:43.000Z";
+const std::string COUCHBASE_GET_RESULT_CONTENT = "abc";
+const uint64_t COUCHBASE_GET_RESULT_CAS = 1234567;
 
 struct UpsertParameters {
   CouchbaseValueType document_type;
@@ -36,6 +39,10 @@ struct UpsertParameters {
   std::vector<std::byte> buffer;
   ::couchbase::upsert_options options;
 };
+struct GetParameters {
+  std::string document_id;
+  CouchbaseValueType document_type;
+};
 
 class MockCouchbaseClusterService : public 
controllers::CouchbaseClusterService {
  public:
@@ -57,7 +64,23 @@ class MockCouchbaseClusterService : public 
controllers::CouchbaseClusterService
     if (upsert_error_) {
       return nonstd::make_unexpected(*upsert_error_);
     } else {
-      return CouchbaseUpsertResult{collection_.bucket_name, 
COUCHBASE_PUT_RESULT_CAS, COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER, 
COUCHBASE_PUT_RESULT_PARTITION_UUID, COUCHBASE_PUT_RESULT_PARTITION_ID};
+      return CouchbaseUpsertResult{{collection_.bucket_name, 
COUCHBASE_PUT_RESULT_CAS}, COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER, 
COUCHBASE_PUT_RESULT_PARTITION_UUID, COUCHBASE_PUT_RESULT_PARTITION_ID};
+    }
+  }
+
+  nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> get(const 
CouchbaseCollection& collection, const std::string& document_id, 
CouchbaseValueType document_type) override {
+    collection_ = collection;
+    get_parameters_.document_id = document_id;
+    get_parameters_.document_type = document_type;
+
+    if (get_error_) {
+      return nonstd::make_unexpected(*get_error_);
+    } else {
+      if (document_type == CouchbaseValueType::String) {
+        return CouchbaseGetResult{{collection_.bucket_name, 
COUCHBASE_GET_RESULT_CAS}, COUCHBASE_GET_RESULT_EXPIRY, 
COUCHBASE_GET_RESULT_CONTENT};
+      }
+      return CouchbaseGetResult{{collection_.bucket_name, 
COUCHBASE_GET_RESULT_CAS}, COUCHBASE_GET_RESULT_EXPIRY,
+        std::vector<std::byte>{static_cast<std::byte>('a'), 
static_cast<std::byte>('b'), static_cast<std::byte>('c')}};
     }
   }
 
@@ -65,6 +88,10 @@ class MockCouchbaseClusterService : public 
controllers::CouchbaseClusterService
     return upsert_parameters_;
   }
 
+  GetParameters getGetParameters() const {
+    return get_parameters_;
+  }
+
   CouchbaseCollection getCollectionParameter() const {
     return collection_;
   }
@@ -73,9 +100,15 @@ class MockCouchbaseClusterService : public 
controllers::CouchbaseClusterService
     upsert_error_ = upsert_error;
   }
 
+  void setGetError(const CouchbaseErrorType get_error) {
+    get_error_ = get_error;
+  }
+
  private:
   CouchbaseCollection collection_;
   UpsertParameters upsert_parameters_;
+  GetParameters get_parameters_;
   std::optional<CouchbaseErrorType> upsert_error_;
+  std::optional<CouchbaseErrorType> get_error_;
 };
 }  // namespace org::apache::nifi::minifi::couchbase::test
diff --git a/libminifi/include/core/Processor.h 
b/libminifi/include/core/Processor.h
index f720733e8..6f25c176c 100644
--- a/libminifi/include/core/Processor.h
+++ b/libminifi/include/core/Processor.h
@@ -238,6 +238,8 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
   std::string cron_period_;
   gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
 
+  std::shared_ptr<logging::Logger> logger_;
+
  private:
   mutable std::mutex mutex_;
   std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{};
@@ -254,8 +256,6 @@ class Processor : public Connectable, public 
ConfigurableComponent, public state
 
   // an outgoing connection allows us to reach these nodes
   std::unordered_map<Connection*, std::unordered_set<Processor*>> 
reachable_processors_;
-
-  std::shared_ptr<logging::Logger> logger_;
 };
 
 }  // namespace core

Reply via email to