This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ea16935f9cbb30864a36bc78e2a9ce47ba039d5b Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Nov 27 12:56:07 2024 +0100 MINIFICPP-2469 Create GetCouchbaseKey processor Closes #1881 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 40 +++++ README.md | 2 +- .../cluster/containers/CouchbaseServerContainer.py | 4 +- docker/test/integration/features/couchbase.feature | 108 +++++++++++++ .../{PutCouchbaseKey.py => GetCouchbaseKey.py} | 8 +- .../minifi/processors/PutCouchbaseKey.py | 2 +- .../controllerservices/CouchbaseClusterService.cpp | 76 ++++++--- .../controllerservices/CouchbaseClusterService.h | 17 ++- .../couchbase/processors/GetCouchbaseKey.cpp | 111 ++++++++++++++ .../{PutCouchbaseKey.h => GetCouchbaseKey.h} | 96 +++--------- extensions/couchbase/processors/PutCouchbaseKey.h | 2 +- .../couchbase/tests/GetCouchbaseKeyTests.cpp | 170 +++++++++++++++++++++ .../couchbase/tests/MockCouchbaseClusterService.h | 35 ++++- libminifi/include/core/Processor.h | 4 +- 14 files changed, 573 insertions(+), 102 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index eec26bbac..667130ab4 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -48,6 +48,7 @@ limitations under the License. - [FetchSmb](#FetchSmb) - [FocusArchiveEntry](#FocusArchiveEntry) - [GenerateFlowFile](#GenerateFlowFile) +- [GetCouchbaseKey](#GetCouchbaseKey) - [GetFile](#GetFile) - [GetTCP](#GetTCP) - [HashContent](#HashContent) @@ -1096,6 +1097,45 @@ In the list below, the names of required properties appear in bold. Any other pr | success | success operational on the flow record | +## GetCouchbaseKey + +### Description + +Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the <Document Id> property. NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of the entire FlowFile will be buffered in memory. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|------------------------------------------|---------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Couchbase Cluster Controller Service** | | | A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster. | +| **Bucket Name** | default | | The name of bucket to access.<br/>**Supports Expression Language: true** | +| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.<br/>**Supports Expression Language: true** | +| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.<br/>**Supports Expression Language: true** | +| **Document Type** | Json | Json<br/>Binary<br/>String | Content type of the retrieved value. | +| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.<br/>**Supports Expression Language: true** | +| Put Value to Attribute | | | If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile. The attribute key to put to is determined by evaluating value of this property.<br/>**Supports Expression Language: true** | + +### Relationships + +| Name | Description | +|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| success | Values retrieved from Couchbase Server are written as outgoing FlowFiles content or put into an attribute of the incoming FlowFile and routed to this relationship. | +| failure | All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship. | +| retry | All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship. | +| original | The original input FlowFile is routed to this relationship when the value is retrieved from Couchbase Server and routed to 'success'. | + +### Output Attributes + +| Attribute | Relationship | Description | +|----------------------|--------------|---------------------------------------| +| couchbase.bucket | success | Bucket where the document was stored. | +| couchbase.doc.id | success | Id of the document. | +| couchbase.doc.cas | success | CAS of the document. | +| couchbase.doc.expiry | success | Expiration of the document. | + + ## GetFile ### Description diff --git a/README.md b/README.md index 069d142ad..3c659f335 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte | AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3) [...] | Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobstorage)<br/>[DeleteAzureBlobStorage](PROCESSORS.md#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](PROCESSORS.md#fetchazureblobstorage)<br/>[ListAzureBlobStorage](PROCESSORS.md#listazureblobstorage)<br/>[PutAzureDataLakeStorage](PROCESSORS.md#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](PROCESSORS.md#del [...] | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) [...] -| Couchbase | [CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)<br/>[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey) [...] +| Couchbase | [CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)<br/>[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey)<br/>[GetCouchbaseKey](PROCESSORS.md#getcouchbasekey) [...] | Elasticsearch | [ElasticsearchCredentialsControllerService](CONTROLLERS.md#elasticsearchcredentialscontrollerservice)<br/>[PostElasticsearch](PROCESSORS.md#postelasticsearch) [...] | ExecuteProcess (Linux and macOS) | [ExecuteProcess](PROCESSORS.md#executeprocess) [...] | Google Cloud Platform | [DeleteGCSObject](PROCESSORS.md#deletegcsobject)<br>[FetchGCSObject](PROCESSORS.md#fetchgcsobject)<br>[GCPCredentialsControllerService](CONTROLLERS.md#gcpcredentialscontrollerservice)<br>[ListGCSBucket](PROCESSORS.md#listgcsbucket)<br>[PutGCSObject](PROCESSORS.md#putgcsobject) [...] diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py index 1c4353046..9baf01f08 100644 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py @@ -33,7 +33,7 @@ class CouchbaseServerContainer(Container): ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", - "--bucket-ramsize", "1024"] + "--bucket-ramsize", "1024", "--max-ttl", "36000"] ] for command in commands: (code, _) = self.client.containers.get(self.name).exec_run(command) @@ -47,7 +47,7 @@ class CouchbaseServerContainer(Container): return self.docker_container = self.client.containers.run( - "couchbase:community-7.6.2", + "couchbase:enterprise-7.2.5", detach=True, name=self.name, network=self.network.name, diff --git a/docker/test/integration/features/couchbase.feature b/docker/test/integration/features/couchbase.feature index 446821f25..646eaacd9 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/docker/test/integration/features/couchbase.feature @@ -65,3 +65,111 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1"}' of type "Binary" in Couchbase + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using binary storage + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor and put the result in an attribute + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the PutCouchbaseKey processor is set to "String" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the GetCouchbaseKey processor is set to "String" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And the "Put Value to Attribute" property of the GetCouchbaseKey processor is set to "get_couchbase_result" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + And the Minifi logs contain the following message: 'key:get_couchbase_result value:{"field1": "value1", "field2": "value2"}' in less than 1 seconds + + Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on Couchbase value type mismatch + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the PutCouchbaseKey processor is set to "String" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + + When a Couchbase server is started + And all instances start up + + Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py b/docker/test/integration/minifi/processors/GetCouchbaseKey.py similarity index 84% copy from docker/test/integration/minifi/processors/PutCouchbaseKey.py copy to docker/test/integration/minifi/processors/GetCouchbaseKey.py index 5e94aaa07..0b48dcbdd 100644 --- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py +++ b/docker/test/integration/minifi/processors/GetCouchbaseKey.py @@ -15,10 +15,10 @@ from ..core.Processor import Processor -class PutCouchbaseKey(Processor): +class GetCouchbaseKey(Processor): def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PutCouchbaseKey, self).__init__( + super(GetCouchbaseKey, self).__init__( context=context, - clazz='PutCouchbaseKey', - auto_terminate=['success', 'failure'], + clazz='GetCouchbaseKey', + auto_terminate=['success', 'failure', 'retry'], schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py b/docker/test/integration/minifi/processors/PutCouchbaseKey.py index 5e94aaa07..341338771 100644 --- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py +++ b/docker/test/integration/minifi/processors/PutCouchbaseKey.py @@ -20,5 +20,5 @@ class PutCouchbaseKey(Processor): super(PutCouchbaseKey, self).__init__( context=context, clazz='PutCouchbaseKey', - auto_terminate=['success', 'failure'], + auto_terminate=['success', 'failure', 'retry'], schedule=schedule) diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index 5207657f2..c40697cf7 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -17,11 +17,12 @@ */ #include "CouchbaseClusterService.h" -#include "couchbase/codec/raw_binary_transcoder.hxx" -#include "couchbase/codec/raw_string_transcoder.hxx" -#include "couchbase/codec/raw_json_transcoder.hxx" #include "core/Resource.h" +#include "couchbase/codec/raw_binary_transcoder.hxx" +#include "couchbase/codec/raw_json_transcoder.hxx" +#include "couchbase/codec/raw_string_transcoder.hxx" +#include "utils/TimeUtil.h" namespace org::apache::nifi::minifi::couchbase { @@ -58,8 +59,8 @@ nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::g return cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name); } -nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::upsert(const CouchbaseCollection& collection, - CouchbaseValueType document_type, const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) { +nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::upsert( + const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options) { auto collection_result = getCollection(collection); if (!collection_result.has_value()) { return nonstd::make_unexpected(collection_result.error()); @@ -78,27 +79,68 @@ nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> CouchbaseClient::ups if (upsert_err.ec()) { // ambiguous_timeout should not be retried as we do not know if the insert was successful or not if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast<int>(::couchbase::errc::common::ambiguous_timeout)) { - logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'", - document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message()); + logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, + collection.collection_name, upsert_err.ec(), upsert_err.message()); return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY); } - logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' with error code: '{}', message: '{}'", - document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message()); + logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' with error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, + collection.collection_name, upsert_err.ec(), upsert_err.message()); return nonstd::make_unexpected(CouchbaseErrorType::FATAL); } else { - const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_uuid() : 0); - const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0); - const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0); return CouchbaseUpsertResult { - collection.bucket_name, - upsert_resp.cas().value(), - partition_uuid, - sequence_number, - partition_id + { + collection.bucket_name, + upsert_resp.cas().value(), + }, + (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0), + (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_uuid() : 0), + gsl::narrow<uint16_t>(upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0) }; } } +nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> CouchbaseClient::get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType return_type) { + auto collection_result = getCollection(collection); + if (!collection_result.has_value()) { + return nonstd::make_unexpected(collection_result.error()); + } + + ::couchbase::get_options options; + options.with_expiry(true); + auto [get_err, resp] = collection_result->get(document_id, options).get(); + if (get_err.ec()) { + if (getErrorType(get_err.ec()) == CouchbaseErrorType::TEMPORARY) { + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout", document_id, collection.bucket_name, collection.scope_name, collection.collection_name); + return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY); + } + std::string cause = get_err.cause() ? get_err.cause()->message() : ""; + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' with error code: '{}', message: '{}'", document_id, collection.bucket_name, collection.scope_name, + collection.collection_name, get_err.ec(), get_err.message()); + return nonstd::make_unexpected(CouchbaseErrorType::FATAL); + } else { + try { + CouchbaseGetResult result; + result.bucket_name = collection.bucket_name; + result.cas = resp.cas().value(); + if (return_type == CouchbaseValueType::Json) { + result.value = resp.content_as<::couchbase::codec::binary, ::couchbase::codec::raw_json_transcoder>(); + } else if (return_type == CouchbaseValueType::String) { + result.value = resp.content_as<::couchbase::codec::raw_string_transcoder>(); + } else { + result.value = resp.content_as<::couchbase::codec::raw_binary_transcoder>(); + } + if (resp.expiry_time().has_value()) { + result.expiry = utils::timeutils::getTimeStr(*resp.expiry_time()); + } + return result; + } catch (const std::exception& ex) { + logger_->log_error("Failed to get content for document '{}' from collection '{}.{}.{}' with the following exception: '{}'", document_id, collection.bucket_name, collection.scope_name, + collection.collection_name, ex.what()); + return nonstd::make_unexpected(CouchbaseErrorType::FATAL); + } + } +} + void CouchbaseClient::close() { std::lock_guard<std::mutex> lock(cluster_mutex_); if (cluster_) { diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h index b848b741d..28c6390dd 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h @@ -22,6 +22,7 @@ #include <string> #include <utility> #include <mutex> +#include <variant> #include "core/controller/ControllerService.h" #include "core/PropertyDefinition.h" @@ -39,9 +40,17 @@ struct CouchbaseCollection { std::string collection_name; }; -struct CouchbaseUpsertResult { +struct CouchbaseCallResult { std::string bucket_name; std::uint64_t cas{0}; +}; + +struct CouchbaseGetResult : public CouchbaseCallResult { + std::string expiry; + std::variant<std::vector<std::byte>, std::string> value; +}; + +struct CouchbaseUpsertResult : public CouchbaseCallResult { std::uint64_t sequence_number{0}; std::uint64_t partition_uuid{0}; std::uint16_t partition_id{0}; @@ -75,6 +84,7 @@ class CouchbaseClient { nonstd::expected<CouchbaseUpsertResult, CouchbaseErrorType> upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id, const std::vector<std::byte>& buffer, const ::couchbase::upsert_options& options); + nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType return_type); nonstd::expected<void, CouchbaseErrorType> establishConnection(); void close(); @@ -151,6 +161,11 @@ class CouchbaseClusterService : public core::controller::ControllerService { return client_->upsert(collection, document_type, document_id, buffer, options); } + virtual nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType return_type) { + gsl_Expects(client_); + return client_->get(collection, document_id, return_type); + } + static gsl::not_null<std::shared_ptr<CouchbaseClusterService>> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property); private: diff --git a/extensions/couchbase/processors/GetCouchbaseKey.cpp b/extensions/couchbase/processors/GetCouchbaseKey.cpp new file mode 100644 index 000000000..89cf160f6 --- /dev/null +++ b/extensions/couchbase/processors/GetCouchbaseKey.cpp @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "GetCouchbaseKey.h" +#include "utils/gsl.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::couchbase::processors { + +void GetCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, GetCouchbaseKey::CouchbaseClusterControllerService); + document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, GetCouchbaseKey::DocumentType); +} + +void GetCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + gsl_Expects(couchbase_cluster_service_); + + auto flow_file = session.get(); + if (!flow_file) { + context.yield(); + return; + } + + CouchbaseCollection collection; + if (!context.getProperty(BucketName, collection.bucket_name, flow_file.get()) || collection.bucket_name.empty()) { + logger_->log_error("Bucket '{}' is invalid or empty!", collection.bucket_name); + session.transfer(flow_file, Failure); + return; + } + + if (!context.getProperty(ScopeName, collection.scope_name, flow_file.get()) || collection.scope_name.empty()) { + collection.scope_name = ::couchbase::scope::default_name; + } + + if (!context.getProperty(CollectionName, collection.collection_name, flow_file.get()) || collection.collection_name.empty()) { + collection.collection_name = ::couchbase::collection::default_name; + } + + std::string document_id; + if (!context.getProperty(DocumentId, document_id, flow_file.get()) || document_id.empty()) { + auto ff_content = session.readBuffer(flow_file).buffer; + document_id = std::string(reinterpret_cast<const char*>(ff_content.data()), ff_content.size()); + } + + if (document_id.empty()) { + logger_->log_error("Document ID is empty, transferring FlowFile to failure relationship"); + session.transfer(flow_file, Failure); + return; + } + + std::string attribute_to_put_result_to; + context.getProperty(PutValueToAttribute, attribute_to_put_result_to, flow_file.get()); + + if (auto get_result = couchbase_cluster_service_->get(collection, document_id, document_type_)) { + if (!attribute_to_put_result_to.empty()) { + if (document_type_ == CouchbaseValueType::String) { + session.putAttribute(*flow_file, attribute_to_put_result_to, std::get<std::string>(get_result->value)); + } else { + auto& binary_data = std::get<std::vector<std::byte>>(get_result->value); + std::string str_value{reinterpret_cast<const char*>(binary_data.data()), binary_data.size()}; + session.putAttribute(*flow_file, attribute_to_put_result_to, str_value); + } + } else { + session.write(flow_file, [&, this](const std::shared_ptr<io::OutputStream>& stream) -> int64_t { + if (document_type_ == CouchbaseValueType::String) { + auto& value = std::get<std::string>(get_result->value); + stream->write(value); + return gsl::narrow<int64_t>(value.size()); + } else { + auto& value = std::get<std::vector<std::byte>>(get_result->value); + stream->write(value); + return gsl::narrow<int64_t>(value.size()); + } + }); + } + + session.putAttribute(*flow_file, "couchbase.bucket", get_result->bucket_name); + session.putAttribute(*flow_file, "couchbase.doc.id", document_id); + session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(get_result->cas)); + session.putAttribute(*flow_file, "couchbase.doc.expiry", get_result->expiry); + session.transfer(flow_file, Success); + } else if (get_result.error() == CouchbaseErrorType::TEMPORARY) { + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout, transferring to retry relationship", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name); + session.transfer(flow_file, Retry); + } else { + logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}', transferring to failure relationship", + document_id, collection.bucket_name, collection.scope_name, collection.collection_name); + session.transfer(flow_file, Failure); + } +} + +REGISTER_RESOURCE(GetCouchbaseKey, Processor); + +} // namespace org::apache::nifi::minifi::couchbase::processors diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h b/extensions/couchbase/processors/GetCouchbaseKey.h similarity index 56% copy from extensions/couchbase/processors/PutCouchbaseKey.h copy to extensions/couchbase/processors/GetCouchbaseKey.h index 2b4f420af..4951c18ef 100644 --- a/extensions/couchbase/processors/PutCouchbaseKey.h +++ b/extensions/couchbase/processors/GetCouchbaseKey.h @@ -23,56 +23,18 @@ #include "core/AbstractProcessor.h" #include "core/ProcessSession.h" -#include "utils/Enum.h" #include "core/logging/LoggerConfiguration.h" #include "CouchbaseClusterService.h" -#include "couchbase/persist_to.hxx" -#include "couchbase/replicate_to.hxx" - -namespace magic_enum::customize { - -template <> -constexpr customize_t enum_name<::couchbase::persist_to>(::couchbase::persist_to value) noexcept { - switch (value) { - case ::couchbase::persist_to::none: - return "NONE"; - case ::couchbase::persist_to::active: - return "ACTIVE"; - case ::couchbase::persist_to::one: - return "ONE"; - case ::couchbase::persist_to::two: - return "TWO"; - case ::couchbase::persist_to::three: - return "THREE"; - case ::couchbase::persist_to::four: - return "FOUR"; - } - return invalid_tag; -} - -template <> -constexpr customize_t enum_name<::couchbase::replicate_to>(::couchbase::replicate_to value) noexcept { - switch (value) { - case ::couchbase::replicate_to::none: - return "NONE"; - case ::couchbase::replicate_to::one: - return "ONE"; - case ::couchbase::replicate_to::two: - return "TWO"; - case ::couchbase::replicate_to::three: - return "THREE"; - } - return invalid_tag; -} -} // namespace magic_enum::customize namespace org::apache::nifi::minifi::couchbase::processors { -class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> { +class GetCouchbaseKey final : public core::AbstractProcessor<GetCouchbaseKey> { public: - using core::AbstractProcessor<PutCouchbaseKey>::AbstractProcessor; + using core::AbstractProcessor<GetCouchbaseKey>::AbstractProcessor; - EXTENSIONAPI static constexpr const char* Description = "Put a document to Couchbase Server via Key/Value access."; + EXTENSIONAPI static constexpr const char* Description = "Get a document from Couchbase Server via Key/Value access. The ID of the document to fetch may be supplied by setting the " + "<Document Id> property. NOTE: if the Document Id property is not set, the contents of the FlowFile will be read to determine the Document Id, which means that the contents of " + "the entire FlowFile will be buffered in memory."; EXTENSIONAPI static constexpr auto CouchbaseClusterControllerService = core::PropertyDefinitionBuilder<>::createProperty("Couchbase Cluster Controller Service") .withDescription("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") @@ -93,28 +55,20 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> { .withDescription("Collection to use inside the bucket scope. If not specified, the _default collection is used.") .supportsExpressionLanguage(true) .build(); - EXTENSIONAPI static constexpr auto DocumentType = core::PropertyDefinitionBuilder<magic_enum::enum_count<CouchbaseValueType>()>::createProperty("Document Type") - .withDescription("Content type to store data as.") + EXTENSIONAPI static constexpr auto DocumentType = core::PropertyDefinitionBuilder<3>::createProperty("Document Type") + .withDescription("Content type of the retrieved value.") .isRequired(true) .withDefaultValue(magic_enum::enum_name(CouchbaseValueType::Json)) .withAllowedValues(magic_enum::enum_names<CouchbaseValueType>()) .build(); EXTENSIONAPI static constexpr auto DocumentId = core::PropertyDefinitionBuilder<>::createProperty("Document Id") - .withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. " - "If not specified, either the FlowFile uuid attribute or if that's not found a generated uuid will be used.") + .withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id.") .supportsExpressionLanguage(true) .build(); - EXTENSIONAPI static constexpr auto PersistTo = core::PropertyDefinitionBuilder<6>::createProperty("Persist To") - .withDescription("Durability constraint about disk persistence.") - .isRequired(true) - .withDefaultValue(magic_enum::enum_name(::couchbase::persist_to::none)) - .withAllowedValues(magic_enum::enum_names<::couchbase::persist_to>()) - .build(); - EXTENSIONAPI static constexpr auto ReplicateTo = core::PropertyDefinitionBuilder<4>::createProperty("Replicate To") - .withDescription("Durability constraint about replication.") - .isRequired(true) - .withDefaultValue(magic_enum::enum_name(::couchbase::replicate_to::none)) - .withAllowedValues(magic_enum::enum_names<::couchbase::replicate_to>()) + EXTENSIONAPI static constexpr auto PutValueToAttribute = core::PropertyDefinitionBuilder<>::createProperty("Put Value to Attribute") + .withDescription("If set, the retrieved value will be put into an attribute of the FlowFile instead of a the content of the FlowFile. " + "The attribute key to put to is determined by evaluating value of this property.") + .supportsExpressionLanguage(true) .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ @@ -124,23 +78,23 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> { CollectionName, DocumentType, DocumentId, - PersistTo, - ReplicateTo + PutValueToAttribute }); - EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All FlowFiles that are written to Couchbase Server are routed to this relationship."}; - EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "All FlowFiles failed to be written to Couchbase Server and not retry-able are routed to this relationship."}; - EXTENSIONAPI static constexpr auto Retry = core::RelationshipDefinition{"retry", "All FlowFiles failed to be written to Couchbase Server but can be retried are routed to this relationship."}; - EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure, Retry}; + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", + "Values retrieved from Couchbase Server are written as outgoing FlowFiles content or put into an attribute of the incoming FlowFile and routed to this relationship."}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "All FlowFiles failed to fetch from Couchbase Server and not retry-able are routed to this relationship."}; + EXTENSIONAPI static constexpr auto Retry = core::RelationshipDefinition{"retry", "All FlowFiles failed to fetch from Couchbase Server but can be retried are routed to this relationship."}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", + "The original input FlowFile is routed to this relationship when the value is retrieved from Couchbase Server and routed to 'success'."}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure, Retry, Original}; EXTENSIONAPI static constexpr auto CouchbaseBucket = core::OutputAttributeDefinition<>{"couchbase.bucket", {Success}, "Bucket where the document was stored."}; EXTENSIONAPI static constexpr auto CouchbaseDocId = core::OutputAttributeDefinition<>{"couchbase.doc.id", {Success}, "Id of the document."}; EXTENSIONAPI static constexpr auto CouchbaseDocCas = core::OutputAttributeDefinition<>{"couchbase.doc.cas", {Success}, "CAS of the document."}; - EXTENSIONAPI static constexpr auto CouchbaseDocSequenceNumber = core::OutputAttributeDefinition<>{"couchbase.doc.sequence.number", {Success}, "Sequence number associated with the document."}; - EXTENSIONAPI static constexpr auto CouchbasePartitionUUID = core::OutputAttributeDefinition<>{"couchbase.partition.uuid", {Success}, "UUID of partition."}; - EXTENSIONAPI static constexpr auto CouchbasePartitionId = core::OutputAttributeDefinition<>{"couchbase.partition.id", {Success}, "ID of partition (also known as vBucket)."}; - EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 6>{ - CouchbaseBucket, CouchbaseDocId, CouchbaseDocCas, CouchbaseDocSequenceNumber, CouchbasePartitionUUID, CouchbasePartitionId}; + EXTENSIONAPI static constexpr auto CouchbaseDocExpiry = core::OutputAttributeDefinition<>{"couchbase.doc.expiry", {Success}, "Expiration of the document."}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 4>{ + CouchbaseBucket, CouchbaseDocId, CouchbaseDocCas, CouchbaseDocExpiry}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; @@ -152,10 +106,8 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> { private: std::shared_ptr<controllers::CouchbaseClusterService> couchbase_cluster_service_; - std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_); CouchbaseValueType document_type_ = CouchbaseValueType::Json; - ::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none; - ::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none; + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GetCouchbaseKey>::getLogger(uuid_); }; } // namespace org::apache::nifi::minifi::couchbase::processors diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h b/extensions/couchbase/processors/PutCouchbaseKey.h index 2b4f420af..1991488ed 100644 --- a/extensions/couchbase/processors/PutCouchbaseKey.h +++ b/extensions/couchbase/processors/PutCouchbaseKey.h @@ -152,10 +152,10 @@ class PutCouchbaseKey final : public core::AbstractProcessor<PutCouchbaseKey> { private: std::shared_ptr<controllers::CouchbaseClusterService> couchbase_cluster_service_; - std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_); CouchbaseValueType document_type_ = CouchbaseValueType::Json; ::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none; ::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none; + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutCouchbaseKey>::getLogger(uuid_); }; } // namespace org::apache::nifi::minifi::couchbase::processors diff --git a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp new file mode 100644 index 000000000..0d2e5b910 --- /dev/null +++ b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp @@ -0,0 +1,170 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "unit/SingleProcessorTestController.h" +#include "processors/GetCouchbaseKey.h" +#include "MockCouchbaseClusterService.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::couchbase::test { + +REGISTER_RESOURCE(MockCouchbaseClusterService, ControllerService); + +struct ExpectedCallOptions { + std::string bucket_name; + std::string scope_name; + std::string collection_name; + std::string doc_id; + couchbase::CouchbaseValueType document_type; +}; + +class GetCouchbaseKeyTestController : public TestController { + public: + GetCouchbaseKeyTestController() + : controller_(std::make_unique<processors::GetCouchbaseKey>("GetCouchbaseKey")), + proc_(controller_.getProcessor()) { + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::core::Processor>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>(); + LogTestController::getInstance().setDebug<processors::GetCouchbaseKey>(); + auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService"); + mock_couchbase_cluster_service_ = std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation()); + proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService"); + } + + void verifyResults(const minifi::test::ProcessorTriggerResult& results, const minifi::core::Relationship& expected_result, const ExpectedCallOptions& expected_call_options, + const std::string& input) const { + std::shared_ptr<core::FlowFile> flow_file; + if (expected_result == processors::GetCouchbaseKey::Success) { + REQUIRE(results.at(processors::GetCouchbaseKey::Success).size() == 1); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty()); + flow_file = results.at(processors::GetCouchbaseKey::Success)[0]; + } else if (expected_result == processors::GetCouchbaseKey::Failure) { + REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).size() == 1); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty()); + flow_file = results.at(processors::GetCouchbaseKey::Failure)[0]; + REQUIRE(LogTestController::getInstance().contains("Failed to get document", 1s)); + } else { + REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).size() == 1); + flow_file = results.at(processors::GetCouchbaseKey::Retry)[0]; + } + + auto get_collection_parameters = mock_couchbase_cluster_service_->getCollectionParameter(); + CHECK(get_collection_parameters.bucket_name == expected_call_options.bucket_name); + CHECK(get_collection_parameters.collection_name == expected_call_options.collection_name); + CHECK(get_collection_parameters.scope_name == expected_call_options.scope_name); + + auto get_parameters = mock_couchbase_cluster_service_->getGetParameters(); + CHECK(get_parameters.document_id == expected_call_options.doc_id); + CHECK(get_parameters.document_type == expected_call_options.document_type); + + if (expected_result != processors::GetCouchbaseKey::Success) { + return; + } + + CHECK(flow_file->getAttribute("couchbase.bucket").value() == expected_call_options.bucket_name); + CHECK(flow_file->getAttribute("couchbase.doc.id").value() == expected_call_options.doc_id); + CHECK(flow_file->getAttribute("couchbase.doc.cas").value() == std::to_string(COUCHBASE_GET_RESULT_CAS)); + CHECK(flow_file->getAttribute("couchbase.doc.expiry").value() == COUCHBASE_GET_RESULT_EXPIRY); + std::string value; + proc_->getProperty(processors::GetCouchbaseKey::PutValueToAttribute, value); + if (!value.empty()) { + CHECK(flow_file->getAttribute(value).value() == COUCHBASE_GET_RESULT_CONTENT); + CHECK(controller_.plan->getContent(flow_file) == input); + } else { + CHECK(controller_.plan->getContent(flow_file) == COUCHBASE_GET_RESULT_CONTENT); + } + } + + protected: + minifi::test::SingleProcessorTestController controller_; + core::Processor* proc_ = nullptr; + std::shared_ptr<MockCouchbaseClusterService> mock_couchbase_cluster_service_; +}; + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Invalid Couchbase cluster controller service", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService, "invalid"); + REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}}), minifi::Exception); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Invalid bucket name", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, ""); + auto results = controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}}); + REQUIRE(results[processors::GetCouchbaseKey::Failure].size() == 1); + REQUIRE(LogTestController::getInstance().contains("Bucket '' is invalid or empty!", 1s)); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Document ID is empty and no content is present to use", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + auto results = controller_.trigger({minifi::test::InputFlowFileData{""}}); + REQUIRE(results.at(processors::GetCouchbaseKey::Success).empty()); + REQUIRE(results.at(processors::GetCouchbaseKey::Failure).size() == 1); + REQUIRE(results.at(processors::GetCouchbaseKey::Retry).empty()); + REQUIRE(LogTestController::getInstance().contains("Document ID is empty, transferring FlowFile to failure relationship", 1s)); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get succeeeds with default properties", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::Json}, input); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get succeeeds with optional properties", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + proc_->setProperty(processors::GetCouchbaseKey::ScopeName, "scope1"); + proc_->setProperty(processors::GetCouchbaseKey::CollectionName, "collection1"); + proc_->setProperty(processors::GetCouchbaseKey::DocumentId, "important_doc"); + proc_->setProperty(processors::GetCouchbaseKey::DocumentType, "Binary"); + auto results = controller_.trigger({minifi::test::InputFlowFileData{""}}); + verifyResults(results, processors::GetCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "scope1", "collection1", "important_doc", couchbase::CouchbaseValueType::Binary}, ""); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get fails with default properties", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + mock_couchbase_cluster_service_->setGetError(CouchbaseErrorType::FATAL); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Failure, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::Json}, input); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "FlowFile is transferred to retry relationship when temporary error is returned", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + mock_couchbase_cluster_service_->setGetError(CouchbaseErrorType::TEMPORARY); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Retry, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::Json}, input); +} + +TEST_CASE_METHOD(GetCouchbaseKeyTestController, "Get result is written to attribute", "[getcouchbasekey]") { + proc_->setProperty(processors::GetCouchbaseKey::BucketName, "mybucket"); + proc_->setProperty(processors::GetCouchbaseKey::DocumentType, "String"); + proc_->setProperty(processors::GetCouchbaseKey::PutValueToAttribute, "myattribute"); + const std::string input = "couchbase_id"; + auto results = controller_.trigger({minifi::test::InputFlowFileData{input}}); + verifyResults(results, processors::GetCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default", input, couchbase::CouchbaseValueType::String}, input); +} + +} // namespace org::apache::nifi::minifi::couchbase::test diff --git a/extensions/couchbase/tests/MockCouchbaseClusterService.h b/extensions/couchbase/tests/MockCouchbaseClusterService.h index a2757f16c..6566e85d4 100644 --- a/extensions/couchbase/tests/MockCouchbaseClusterService.h +++ b/extensions/couchbase/tests/MockCouchbaseClusterService.h @@ -29,6 +29,9 @@ const std::uint64_t COUCHBASE_PUT_RESULT_CAS = 9876; const std::uint64_t COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER = 345; const std::uint64_t COUCHBASE_PUT_RESULT_PARTITION_UUID = 7890123456; const std::uint16_t COUCHBASE_PUT_RESULT_PARTITION_ID = 1234; +const std::string COUCHBASE_GET_RESULT_EXPIRY = "2024/10/14 09:37:43.000Z"; +const std::string COUCHBASE_GET_RESULT_CONTENT = "abc"; +const uint64_t COUCHBASE_GET_RESULT_CAS = 1234567; struct UpsertParameters { CouchbaseValueType document_type; @@ -36,6 +39,10 @@ struct UpsertParameters { std::vector<std::byte> buffer; ::couchbase::upsert_options options; }; +struct GetParameters { + std::string document_id; + CouchbaseValueType document_type; +}; class MockCouchbaseClusterService : public controllers::CouchbaseClusterService { public: @@ -57,7 +64,23 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService if (upsert_error_) { return nonstd::make_unexpected(*upsert_error_); } else { - return CouchbaseUpsertResult{collection_.bucket_name, COUCHBASE_PUT_RESULT_CAS, COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER, COUCHBASE_PUT_RESULT_PARTITION_UUID, COUCHBASE_PUT_RESULT_PARTITION_ID}; + return CouchbaseUpsertResult{{collection_.bucket_name, COUCHBASE_PUT_RESULT_CAS}, COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER, COUCHBASE_PUT_RESULT_PARTITION_UUID, COUCHBASE_PUT_RESULT_PARTITION_ID}; + } + } + + nonstd::expected<CouchbaseGetResult, CouchbaseErrorType> get(const CouchbaseCollection& collection, const std::string& document_id, CouchbaseValueType document_type) override { + collection_ = collection; + get_parameters_.document_id = document_id; + get_parameters_.document_type = document_type; + + if (get_error_) { + return nonstd::make_unexpected(*get_error_); + } else { + if (document_type == CouchbaseValueType::String) { + return CouchbaseGetResult{{collection_.bucket_name, COUCHBASE_GET_RESULT_CAS}, COUCHBASE_GET_RESULT_EXPIRY, COUCHBASE_GET_RESULT_CONTENT}; + } + return CouchbaseGetResult{{collection_.bucket_name, COUCHBASE_GET_RESULT_CAS}, COUCHBASE_GET_RESULT_EXPIRY, + std::vector<std::byte>{static_cast<std::byte>('a'), static_cast<std::byte>('b'), static_cast<std::byte>('c')}}; } } @@ -65,6 +88,10 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService return upsert_parameters_; } + GetParameters getGetParameters() const { + return get_parameters_; + } + CouchbaseCollection getCollectionParameter() const { return collection_; } @@ -73,9 +100,15 @@ class MockCouchbaseClusterService : public controllers::CouchbaseClusterService upsert_error_ = upsert_error; } + void setGetError(const CouchbaseErrorType get_error) { + get_error_ = get_error; + } + private: CouchbaseCollection collection_; UpsertParameters upsert_parameters_; + GetParameters get_parameters_; std::optional<CouchbaseErrorType> upsert_error_; + std::optional<CouchbaseErrorType> get_error_; }; } // namespace org::apache::nifi::minifi::couchbase::test diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index f720733e8..6f25c176c 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -238,6 +238,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state std::string cron_period_; gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_; + std::shared_ptr<logging::Logger> logger_; + private: mutable std::mutex mutex_; std::atomic<std::chrono::steady_clock::time_point> yield_expiration_{}; @@ -254,8 +256,6 @@ class Processor : public Connectable, public ConfigurableComponent, public state // an outgoing connection allows us to reach these nodes std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; - - std::shared_ptr<logging::Logger> logger_; }; } // namespace core
