This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4780b541bb0ee5c6a4419840f02f6d64c10d72ad Author: Marton Szasz <sza...@apache.org> AuthorDate: Sat Sep 16 05:14:58 2023 +0200 MINIFICPP-2218 Refactor expected monadic functions Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com> This closes #1659 --- CPPLINT.cfg | 2 +- extensions/aws/processors/FetchS3Object.cpp | 2 +- extensions/aws/s3/S3Wrapper.cpp | 10 +- extensions/expression-language/Expression.cpp | 4 +- extensions/http-curl/processors/InvokeHTTP.cpp | 10 +- .../mqtt/processors/AbstractMQTTProcessor.cpp | 6 +- extensions/mqtt/processors/ConsumeMQTT.cpp | 12 +- extensions/mqtt/processors/PublishMQTT.cpp | 4 +- .../rocksdb-repos/DatabaseContentRepository.cpp | 8 +- extensions/rocksdb-repos/RocksDbRepository.cpp | 8 +- extensions/sftp/processors/ListSFTP.cpp | 2 +- .../processors/DefragmentText.cpp | 2 +- .../standard-processors/processors/ListFile.cpp | 2 +- .../standard-processors/processors/PutUDP.cpp | 4 +- .../standard-processors/processors/RouteText.cpp | 2 +- extensions/systemd/ConsumeJournald.cpp | 14 +- .../CollectorInitiatedSubscription.cpp | 2 +- libminifi/include/DiskSpaceWatchdog.h | 16 +-- libminifi/include/EventDrivenSchedulingAgent.h | 2 +- libminifi/include/SchedulingAgent.h | 2 +- .../include/core/logging/LoggerConfiguration.h | 2 +- libminifi/include/properties/Decryptor.h | 2 +- libminifi/include/utils/OptionalUtils.h | 10 +- .../utils/detail/MonadicOperationWrappers.h | 16 ++- libminifi/include/utils/expected.h | 124 ++++++++++------- libminifi/src/DiskSpaceWatchdog.cpp | 17 +-- libminifi/src/RootProcessGroupWrapper.cpp | 4 +- libminifi/src/c2/C2Agent.cpp | 6 +- libminifi/src/c2/C2MetricsPublisher.cpp | 2 +- libminifi/src/core/FlowConfiguration.cpp | 2 +- libminifi/src/utils/crypto/EncryptionManager.cpp | 4 +- libminifi/src/utils/crypto/EncryptionProvider.cpp | 2 +- libminifi/test/integration/IntegrationBase.h | 2 +- libminifi/test/unit/ExpectedTest.cpp | 155 ++++++++++++++------- libminifi/test/unit/OptionalTest.cpp | 14 +- minifi_main/MiNiFiMain.cpp | 6 +- 36 files changed, 277 insertions(+), 205 deletions(-) diff --git a/CPPLINT.cfg b/CPPLINT.cfg index 30b47e390..77f9a737d 100644 --- a/CPPLINT.cfg +++ b/CPPLINT.cfg @@ -1,2 +1,2 @@ set noparent -filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon,-build/namespaces_literals,-readability/check +filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon,-build/namespaces_literals,-readability/check,-build/include_what_you_use diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp index 311af7cf8..a2bc62687 100644 --- a/extensions/aws/processors/FetchS3Object.cpp +++ b/extensions/aws/processors/FetchS3Object.cpp @@ -86,7 +86,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte std::optional<minifi::aws::s3::GetObjectResult> result; session->write(flow_file, [&get_object_params, &result, this](const std::shared_ptr<io::OutputStream>& stream) -> int64_t { result = s3_wrapper_.getObject(*get_object_params, *stream); - return (result | minifi::utils::map(&s3::GetObjectResult::write_size)).value_or(0); + return (result | minifi::utils::transform(&s3::GetObjectResult::write_size)).value_or(0); }); if (result) { diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp index 29c180f0d..059332233 100644 --- a/extensions/aws/s3/S3Wrapper.cpp +++ b/extensions/aws/s3/S3Wrapper.cpp @@ -204,16 +204,16 @@ std::optional<PutObjectResult> S3Wrapper::putObjectMultipart(const PutObjectRequ if (auto upload_state = getMultipartUploadState(put_object_params)) { logger_->log_info("Found previous multipart upload state for %s in bucket %s, continuing upload", put_object_params.object_key, put_object_params.bucket); return uploadParts(put_object_params, stream, std::move(*upload_state)) - | minifi::utils::flatMap([&, this](const auto& upload_parts_result) { return completeMultipartUpload(put_object_params, upload_parts_result); }) - | minifi::utils::map([this](const auto& complete_multipart_upload_result) { return createPutObjectResult(complete_multipart_upload_result); }); + | minifi::utils::andThen([&, this](const auto& upload_parts_result) { return completeMultipartUpload(put_object_params, upload_parts_result); }) + | minifi::utils::transform([this](const auto& complete_multipart_upload_result) { return createPutObjectResult(complete_multipart_upload_result); }); } else { logger_->log_debug("No previous multipart upload state was found for %s in bucket %s", put_object_params.object_key, put_object_params.bucket); auto request = createPutObjectRequest<Aws::S3::Model::CreateMultipartUploadRequest>(put_object_params); return request_sender_->sendCreateMultipartUploadRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing) - | minifi::utils::flatMap([&, this](const auto& create_multipart_result) { return uploadParts(put_object_params, stream, + | minifi::utils::andThen([&, this](const auto& create_multipart_result) { return uploadParts(put_object_params, stream, MultipartUploadState{create_multipart_result.GetUploadId(), multipart_size, flow_size, Aws::Utils::DateTime::Now()}); }) - | minifi::utils::flatMap([&, this](const auto& upload_parts_result) { return completeMultipartUpload(put_object_params, upload_parts_result); }) - | minifi::utils::map([this](const auto& complete_multipart_upload_result) { return createPutObjectResult(complete_multipart_upload_result); }); + | minifi::utils::andThen([&, this](const auto& upload_parts_result) { return completeMultipartUpload(put_object_params, upload_parts_result); }) + | minifi::utils::transform([this](const auto& complete_multipart_upload_result) { return createPutObjectResult(complete_multipart_upload_result); }); } } diff --git a/extensions/expression-language/Expression.cpp b/extensions/expression-language/Expression.cpp index bde9039fb..dffab1134 100644 --- a/extensions/expression-language/Expression.cpp +++ b/extensions/expression-language/Expression.cpp @@ -198,8 +198,8 @@ Value expr_reverseDnsLookup(const std::vector<Value>& args) { } return utils::net::addressFromString(ip_address_str) - | utils::flatMap([timeout_duration](const auto& ip_address) { return utils::net::reverseDnsLookup(ip_address, timeout_duration);}) - | utils::map([](const auto& hostname)-> Value { return Value(hostname); }) + | utils::andThen([timeout_duration](const auto& ip_address) { return utils::net::reverseDnsLookup(ip_address, timeout_duration);}) + | utils::transform([](const auto& hostname)-> Value { return Value(hostname); }) | utils::valueOrElse([&](std::error_code error_code) { if (error_code.value() == asio::error::timed_out) { core::logging::LoggerFactory<Expression>::getLogger()->log_warn("reverseDnsLookup timed out"); diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp index f9696e425..549b03add 100644 --- a/extensions/http-curl/processors/InvokeHTTP.cpp +++ b/extensions/http-curl/processors/InvokeHTTP.cpp @@ -67,7 +67,7 @@ void setupClientProxy(extensions::curl::HTTPClient& client, const core::ProcessC } void setupClientPeerVerification(extensions::curl::HTTPClient& client, const core::ProcessContext& context) { - if (auto disable_peer_verification = context.getProperty(InvokeHTTP::DisablePeerVerification) | utils::flatMap(&utils::StringUtils::toBool)) { + if (auto disable_peer_verification = context.getProperty(InvokeHTTP::DisablePeerVerification) | utils::andThen(&utils::StringUtils::toBool)) { client.setPeerVerification(!*disable_peer_verification); } } @@ -99,11 +99,11 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) attributes_to_send_ = context.getProperty(AttributesToSend) | utils::filter([](const std::string& s) { return !s.empty(); }) // avoid compiling an empty string to regex - | utils::map([](const std::string& regex_str) { return utils::Regex{regex_str}; }) + | utils::transform([](const std::string& regex_str) { return utils::Regex{regex_str}; }) | utils::orElse([this] { logger_->log_debug("%s is missing, so the default value will be used", std::string{AttributesToSend.name}); }); - always_output_response_ = (context.getProperty(AlwaysOutputResponse) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false); - penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false); + always_output_response_ = (context.getProperty(AlwaysOutputResponse) | utils::andThen(&utils::StringUtils::toBool)).value_or(false); + penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | utils::andThen(&utils::StringUtils::toBool)).value_or(false); invalid_http_header_field_handling_strategy_ = utils::parseEnumProperty<invoke_http::InvalidHTTPHeaderFieldHandlingOption>(context, InvalidHTTPHeaderFieldHandlingStrategy); @@ -113,7 +113,7 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) put_response_body_in_attribute_.reset(); } - use_chunked_encoding_ = (context.getProperty(UseChunkedEncoding) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false); + use_chunked_encoding_ = (context.getProperty(UseChunkedEncoding) | utils::andThen(&utils::StringUtils::toBool)).value_or(false); send_date_header_ = context.getProperty<bool>(DateHeader).value_or(true); } diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp index 06ecf3d3e..4722ccd71 100644 --- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp +++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp @@ -51,12 +51,12 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex } logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_); - if (const auto keep_alive_interval = context->getProperty(KeepAliveInterval) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (const auto keep_alive_interval = context->getProperty(KeepAliveInterval) | utils::andThen(&core::TimePeriodValue::fromString)) { keep_alive_interval_ = std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds()); } logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] s", int64_t{keep_alive_interval_.count()}); - if (const auto connection_timeout = context->getProperty(ConnectionTimeout) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (const auto connection_timeout = context->getProperty(ConnectionTimeout) | utils::andThen(&core::TimePeriodValue::fromString)) { connection_timeout_ = std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds()); } logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] s", int64_t{connection_timeout_.count()}); @@ -107,7 +107,7 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%u]", static_cast<uint8_t>(last_will_qos_)); last_will_->qos = static_cast<int>(last_will_qos_); - if (const auto value = context->getProperty(LastWillRetain) | utils::flatMap(&utils::StringUtils::toBool)) { + if (const auto value = context->getProperty(LastWillRetain) | utils::andThen(&utils::StringUtils::toBool)) { logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", *value); last_will_retain_ = {*value}; last_will_->retained = last_will_retain_; diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp index 54d497063..686fccd55 100644 --- a/extensions/mqtt/processors/ConsumeMQTT.cpp +++ b/extensions/mqtt/processors/ConsumeMQTT.cpp @@ -51,22 +51,22 @@ void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& co } logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); - if (const auto value = context->getProperty(CleanSession) | utils::flatMap(&utils::StringUtils::toBool)) { + if (const auto value = context->getProperty(CleanSession) | utils::andThen(&utils::StringUtils::toBool)) { clean_session_ = *value; } logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); - if (const auto value = context->getProperty(CleanStart) | utils::flatMap(&utils::StringUtils::toBool)) { + if (const auto value = context->getProperty(CleanStart) | utils::andThen(&utils::StringUtils::toBool)) { clean_start_ = *value; } logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); - if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval) | utils::andThen(&core::TimePeriodValue::fromString)) { session_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds()); } logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); - if (const auto value = context->getProperty(QueueBufferMaxMessage) | utils::flatMap(&utils::toNumber<uint64_t>)) { + if (const auto value = context->getProperty(QueueBufferMaxMessage) | utils::andThen(&utils::toNumber<uint64_t>)) { max_queue_size_ = *value; } logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); @@ -76,12 +76,12 @@ void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& co } logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); - if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum) | utils::flatMap(&utils::toNumber<uint32_t>)) { + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum) | utils::andThen(&utils::toNumber<uint32_t>)) { topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum); } logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (const auto receive_maximum = context->getProperty(ReceiveMaximum) | utils::flatMap(&utils::toNumber<uint32_t>)) { + if (const auto receive_maximum = context->getProperty(ReceiveMaximum) | utils::andThen(&utils::toNumber<uint32_t>)) { receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum); } logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp index 9b2591524..bb5a538d0 100644 --- a/extensions/mqtt/processors/PublishMQTT.cpp +++ b/extensions/mqtt/processors/PublishMQTT.cpp @@ -47,12 +47,12 @@ void PublishMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& co throw Exception(PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT: Topic is required"); } - if (const auto retain_opt = context->getProperty(Retain) | utils::flatMap(&utils::StringUtils::toBool)) { + if (const auto retain_opt = context->getProperty(Retain) | utils::andThen(&utils::StringUtils::toBool)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval) | utils::andThen(&core::TimePeriodValue::fromString)) { message_expiry_interval_ = std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds()); logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); } diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp index e89243a90..e013ae09d 100644 --- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -328,14 +328,14 @@ void DatabaseContentRepository::clearOrphans() { uint64_t DatabaseContentRepository::getRepositorySize() const { return (utils::optional_from_ptr(db_.get()) | - utils::flatMap([](const auto& db) { return db->open(); }) | - utils::flatMap([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0); + utils::andThen([](const auto& db) { return db->open(); }) | + utils::andThen([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0); } uint64_t DatabaseContentRepository::getRepositoryEntryCount() const { return (utils::optional_from_ptr(db_.get()) | - utils::flatMap([](const auto& db) { return db->open(); }) | - utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> { + utils::andThen([](const auto& db) { return db->open(); }) | + utils::andThen([](auto&& opendb) -> std::optional<uint64_t> { std::string key_count; opendb.GetProperty("rocksdb.estimate-num-keys", &key_count); if (!key_count.empty()) { diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp b/extensions/rocksdb-repos/RocksDbRepository.cpp index 6de9fe1a7..4dc420c62 100644 --- a/extensions/rocksdb-repos/RocksDbRepository.cpp +++ b/extensions/rocksdb-repos/RocksDbRepository.cpp @@ -84,14 +84,14 @@ bool RocksDbRepository::Get(const std::string &key, std::string &value) { uint64_t RocksDbRepository::getRepositorySize() const { return (utils::optional_from_ptr(db_.get()) | - utils::flatMap([](const auto& db) { return db->open(); }) | - utils::flatMap([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0); + utils::andThen([](const auto& db) { return db->open(); }) | + utils::andThen([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0); } uint64_t RocksDbRepository::getRepositoryEntryCount() const { return (utils::optional_from_ptr(db_.get()) | - utils::flatMap([](const auto& db) { return db->open(); }) | - utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> { + utils::andThen([](const auto& db) { return db->open(); }) | + utils::andThen([](auto&& opendb) -> std::optional<uint64_t> { std::string key_count; opendb.GetProperty("rocksdb.estimate-num-keys", &key_count); if (!key_count.empty()) { diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp index 9bddb34a3..606f7711e 100644 --- a/extensions/sftp/processors/ListSFTP.cpp +++ b/extensions/sftp/processors/ListSFTP.cpp @@ -125,7 +125,7 @@ void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, logger_->log_error("Minimum File Age attribute is missing or invalid"); } - if (auto maximum_file_age = context->getProperty(MaximumFileAge) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (auto maximum_file_age = context->getProperty(MaximumFileAge) | utils::andThen(&core::TimePeriodValue::fromString)) { maximum_file_age_ = maximum_file_age->getMilliseconds(); } else { logger_->log_error("Maximum File Age attribute is missing or invalid"); diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp index 7fbbe76d3..0a04770fd 100644 --- a/extensions/standard-processors/processors/DefragmentText.cpp +++ b/extensions/standard-processors/processors/DefragmentText.cpp @@ -41,7 +41,7 @@ void DefragmentText::initialize() { void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) { gsl_Expects(context); - if (auto max_buffer_age = context->getProperty(MaxBufferAge) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (auto max_buffer_age = context->getProperty(MaxBufferAge) | utils::andThen(&core::TimePeriodValue::fromString)) { max_age_ = max_buffer_age->getMilliseconds(); setTriggerWhenEmpty(true); logger_->log_trace("The Buffer maximum age is configured to be %" PRId64 " ms", int64_t{max_buffer_age->getMilliseconds().count()}); diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp index ee22a0781..724eade4e 100644 --- a/extensions/standard-processors/processors/ListFile.cpp +++ b/extensions/standard-processors/processors/ListFile.cpp @@ -59,7 +59,7 @@ void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, minimum_file_age_ = minimum_file_age->getMilliseconds(); } - if (auto maximum_file_age = context->getProperty(MaximumFileAge) | utils::flatMap(&core::TimePeriodValue::fromString)) { + if (auto maximum_file_age = context->getProperty(MaximumFileAge) | utils::andThen(&core::TimePeriodValue::fromString)) { maximum_file_age_ = maximum_file_age->getMilliseconds(); } diff --git a/extensions/standard-processors/processors/PutUDP.cpp b/extensions/standard-processors/processors/PutUDP.cpp index 19b6899e4..d3702abb1 100644 --- a/extensions/standard-processors/processors/PutUDP.cpp +++ b/extensions/standard-processors/processors/PutUDP.cpp @@ -124,8 +124,8 @@ void PutUDP::onTrigger(core::ProcessContext* context, core::ProcessSession* cons }; resolve_hostname() - | utils::flatMap(send_data_to_endpoint) - | utils::map(transfer_to_success) + | utils::andThen(send_data_to_endpoint) + | utils::transform(transfer_to_success) | utils::orElse(transfer_to_failure); } diff --git a/extensions/standard-processors/processors/RouteText.cpp b/extensions/standard-processors/processors/RouteText.cpp index 6092e0530..8d7007302 100644 --- a/extensions/standard-processors/processors/RouteText.cpp +++ b/extensions/standard-processors/processors/RouteText.cpp @@ -50,7 +50,7 @@ void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFa matching_ = utils::parseEnumProperty<route_text::Matching>(*context, MatchingStrategy); context->getProperty(TrimWhitespace, trim_); case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? route_text::CasePolicy::IGNORE_CASE : route_text::CasePolicy::CASE_SENSITIVE; - group_regex_ = context->getProperty(GroupingRegex) | utils::map([] (const auto& str) {return utils::Regex(str);}); + group_regex_ = context->getProperty(GroupingRegex) | utils::transform([] (const auto& str) {return utils::Regex(str);}); segmentation_ = utils::parseEnumProperty<route_text::Segmentation>(*context, SegmentationStrategy); context->getProperty(GroupingFallbackValue, group_fallback_); } diff --git a/extensions/systemd/ConsumeJournald.cpp b/extensions/systemd/ConsumeJournald.cpp index ae23b90f0..7655a91e2 100644 --- a/extensions/systemd/ConsumeJournald.cpp +++ b/extensions/systemd/ConsumeJournald.cpp @@ -68,11 +68,11 @@ void ConsumeJournald::onSchedule(core::ProcessContext* const context, core::Proc return std::nullopt; }; batch_size_ = context->getProperty<size_t>(BatchSize).value(); - payload_format_ = (context->getProperty(PayloadFormat) | utils::flatMap(parse_payload_format) + payload_format_ = (context->getProperty(PayloadFormat) | utils::andThen(parse_payload_format) | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "invalid payload format"}; })) .value(); include_timestamp_ = context->getProperty<bool>(IncludeTimestamp).value(); - const auto journal_type = (context->getProperty(JournalType) | utils::flatMap(parse_journal_type) + const auto journal_type = (context->getProperty(JournalType) | utils::andThen(parse_journal_type) | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "invalid journal type"}; })) .value(); const auto process_old_messages = context->getProperty<bool>(ProcessOldMessages).value_or(false); @@ -93,7 +93,7 @@ void ConsumeJournald::onSchedule(core::ProcessContext* const context, core::Proc return process_old_messages ? journal.seekHead() : journal.seekTail(); }; worker_->enqueue([this, &seek_default] { - const auto cursor = state_manager_->get() | utils::map([](std::unordered_map<std::string, std::string>&& m) { return m.at(CURSOR_KEY); }); + const auto cursor = state_manager_->get() | utils::transform([](std::unordered_map<std::string, std::string>&& m) { return m.at(CURSOR_KEY); }); if (!cursor) { seek_default(*journal_); } else { @@ -147,7 +147,7 @@ std::optional<std::span<const char>> ConsumeJournald::enumerateJournalEntry(libw } std::optional<ConsumeJournald::journal_field> ConsumeJournald::getNextField(libwrapper::Journal& journal) { - return enumerateJournalEntry(journal) | utils::map([](std::span<const char> field) { + return enumerateJournalEntry(journal) | utils::transform([](std::span<const char> field) { const auto eq_pos = std::find(std::begin(field), std::end(field), '='); gsl_Ensures(eq_pos != std::end(field) && "field string must contain an equals sign"); const auto eq_idx = gsl::narrow<size_t>(eq_pos - std::begin(field)); @@ -205,12 +205,12 @@ std::string ConsumeJournald::formatSyslogMessage(const journal_message& msg) con const auto pid_string = utils::optional_from_ptr(syslog_pid) | utils::orElse([&] { return utils::optional_from_ptr(systemd_pid); }) - | utils::map([](const std::string* const pid) { return fmt::format("[{}]", *pid); }); + | utils::transform([](const std::string* const pid) { return fmt::format("[{}]", *pid); }); return fmt::format("{} {} {}{}: {}", date::format(timestamp_format_, chr::floor<chr::microseconds>(msg.timestamp)), - (utils::optional_from_ptr(systemd_hostname) | utils::map(utils::dereference)).value_or("unknown_host"), - (utils::optional_from_ptr(syslog_identifier) | utils::map(utils::dereference)).value_or("unknown_process"), + (utils::optional_from_ptr(systemd_hostname) | utils::transform(utils::dereference)).value_or("unknown_host"), + (utils::optional_from_ptr(syslog_identifier) | utils::transform(utils::dereference)).value_or("unknown_process"), pid_string.value_or(std::string{}), *message); } diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp index afe7b85d6..036fbcb8b 100644 --- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp +++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp @@ -110,7 +110,7 @@ void CollectorInitiatedSubscription::onTrigger(const std::shared_ptr<core::Proce if (flowFileCount > 0) { lastActivityTimestamp_ = now; } else if (auto inactive_duration_to_reconnect_ms = context->getProperty<core::TimePeriodValue>(InactiveDurationToReconnect) - | utils::map([](const auto& time_period_value) { return time_period_value.getMilliseconds().count(); }); + | utils::transform([](const auto& time_period_value) { return time_period_value.getMilliseconds().count(); }); inactive_duration_to_reconnect_ms && *inactive_duration_to_reconnect_ms > 0) { if ((now - lastActivityTimestamp_) > *inactive_duration_to_reconnect_ms) { logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", *inactive_duration_to_reconnect_ms); diff --git a/libminifi/include/DiskSpaceWatchdog.h b/libminifi/include/DiskSpaceWatchdog.h index 384a9fe2e..93895b15c 100644 --- a/libminifi/include/DiskSpaceWatchdog.h +++ b/libminifi/include/DiskSpaceWatchdog.h @@ -25,17 +25,12 @@ #include "utils/IntervalSwitch.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { class Configure; -namespace core { -namespace logging { +namespace core::logging { class Logger; -} // namespace logging -} // namespace core +} // namespace core::logging namespace disk_space_watchdog { struct Config { @@ -54,7 +49,4 @@ inline utils::IntervalSwitch<std::uintmax_t> disk_space_interval_switch(Config c std::vector<std::uintmax_t> check_available_space(const std::vector<std::string>& paths, core::logging::Logger* logger = nullptr); } // namespace disk_space_watchdog -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index f8664cc25..352212179 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -42,7 +42,7 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { using namespace std::literals::chrono_literals; time_slice_ = configuration->get(Configure::nifi_flow_engine_event_driven_time_slice) - | utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>) + | utils::andThen(utils::timeutils::StringToDuration<std::chrono::milliseconds>) | utils::valueOrElse([] { return DEFAULT_TIME_SLICE; }); if (time_slice_ < 10ms || 1000ms < time_slice_) { diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 4911f5e90..b24b74d97 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -66,7 +66,7 @@ class SchedulingAgent { flow_repo_ = flow_repo; alert_time_ = configuration->get(Configure::nifi_flow_engine_alert_period) - | utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>) + | utils::andThen(utils::timeutils::StringToDuration<std::chrono::milliseconds>) | utils::valueOrElse([] { return SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD; }); if (alert_time_ > std::chrono::milliseconds(0)) { diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index c62ee74b4..a34d68bd0 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -62,7 +62,7 @@ struct LoggerNamespace { }; inline std::optional<std::string> formatId(std::optional<utils::Identifier> opt_id) { - return opt_id | utils::map([](auto id) { return " (" + std::string(id.to_string()) + ")"; }); + return opt_id | utils::transform([](auto id) { return " (" + std::string(id.to_string()) + ")"; }); } inline constexpr std::string_view UNLIMITED_LOG_ENTRY_LENGTH = "unlimited"; diff --git a/libminifi/include/properties/Decryptor.h b/libminifi/include/properties/Decryptor.h index 359e20738..822d338e0 100644 --- a/libminifi/include/properties/Decryptor.h +++ b/libminifi/include/properties/Decryptor.h @@ -43,7 +43,7 @@ class Decryptor { static std::optional<Decryptor> create(const std::filesystem::path& minifi_home) { return utils::crypto::EncryptionProvider::create(minifi_home) - | utils::map([](const utils::crypto::EncryptionProvider& provider) {return Decryptor{provider};}); + | utils::transform([](const utils::crypto::EncryptionProvider& provider) {return Decryptor{provider};}); } private: diff --git a/libminifi/include/utils/OptionalUtils.h b/libminifi/include/utils/OptionalUtils.h index 9f0b92acd..b11d42b65 100644 --- a/libminifi/include/utils/OptionalUtils.h +++ b/libminifi/include/utils/OptionalUtils.h @@ -41,9 +41,9 @@ template<typename T> struct is_optional<std::optional<T>, void> : std::true_type {}; namespace detail { -// map implementation +// transform implementation template<typename SourceType, typename F> -auto operator|(std::optional<SourceType> o, map_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), *std::move(o)))) { +auto operator|(std::optional<SourceType> o, transform_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), *std::move(o)))) { using cb_result = std::decay_t<std::invoke_result_t<F, SourceType>>; if constexpr(std::is_same_v<cb_result, void>) { if (o.has_value()) { @@ -59,9 +59,9 @@ auto operator|(std::optional<SourceType> o, map_wrapper<F> f) noexcept(noexcept( } } -// flatMap implementation +// andThen implementation template<typename SourceType, typename F> -auto operator|(const std::optional<SourceType>& o, flat_map_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o))) +auto operator|(const std::optional<SourceType>& o, and_then_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o))) -> std::optional<typename std::decay<decltype(*std::invoke(std::forward<F>(f.function), *o))>::type> { static_assert(is_optional<decltype(std::invoke(std::forward<F>(f.function), *o))>::value, "flatMap expects a function returning optional"); if (o.has_value()) { @@ -71,7 +71,7 @@ auto operator|(const std::optional<SourceType>& o, flat_map_wrapper<F> f) noexce } } template<typename SourceType, typename F> -auto operator|(std::optional<SourceType>&& o, flat_map_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), std::move(*o)))) +auto operator|(std::optional<SourceType>&& o, and_then_wrapper<F> f) noexcept(noexcept(std::invoke(std::forward<F>(f.function), std::move(*o)))) -> std::optional<typename std::decay<decltype(*std::invoke(std::forward<F>(f.function), std::move(*o)))>::type> { static_assert(is_optional<decltype(std::invoke(std::forward<F>(f.function), std::move(*o)))>::value, "flatMap expects a function returning optional"); if (o.has_value()) { diff --git a/libminifi/include/utils/detail/MonadicOperationWrappers.h b/libminifi/include/utils/detail/MonadicOperationWrappers.h index f8e088621..e973f90fe 100644 --- a/libminifi/include/utils/detail/MonadicOperationWrappers.h +++ b/libminifi/include/utils/detail/MonadicOperationWrappers.h @@ -21,12 +21,12 @@ namespace org::apache::nifi::minifi::utils { namespace detail { template<typename T> -struct map_wrapper { +struct transform_wrapper { T function; }; template<typename T> -struct flat_map_wrapper { +struct and_then_wrapper { T function; }; @@ -44,6 +44,11 @@ template<typename T> struct filter_wrapper { T function; }; + +template<typename T> +struct transform_error_wrapper { + T function; +}; } // namespace detail /** @@ -53,7 +58,7 @@ struct filter_wrapper { * @return Wrapped result of func */ template<typename T> -detail::map_wrapper<T&&> map(T&& func) noexcept { return {std::forward<T>(func)}; } +detail::transform_wrapper<T&&> transform(T&& func) noexcept { return {std::forward<T>(func)}; } /** * Transforms a wrapped value of T by calling the provided function T -> wrapped U, returning wrapped U. @@ -61,7 +66,7 @@ detail::map_wrapper<T&&> map(T&& func) noexcept { return {std::forward<T>(func)} * @return Transformed value */ template<typename T> -detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return {std::forward<T>(func)}; } +detail::and_then_wrapper<T&&> andThen(T&& func) noexcept { return {std::forward<T>(func)}; } /** * For optional-like types, possibly provides a value for an empty object to be replaced with. @@ -89,4 +94,7 @@ detail::value_or_else_wrapper<T&&> valueOrElse(T&& func) noexcept { return {std: */ template<typename T> detail::filter_wrapper<T&&> filter(T&& func) noexcept { return {std::forward<T>(func)}; } + +template<typename T> +detail::transform_error_wrapper<T&&> transformError(T&& func) noexcept { return {std::forward<T>(func)}; } } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/expected.h b/libminifi/include/utils/expected.h index c3bdce50f..f4aa742d6 100644 --- a/libminifi/include/utils/expected.h +++ b/libminifi/include/utils/expected.h @@ -19,8 +19,6 @@ #include <utility> #include "nonstd/expected.hpp" #include "utils/detail/MonadicOperationWrappers.h" -#include "utils/GeneralUtils.h" -#include "utils/meta/detected.h" namespace org::apache::nifi::minifi::utils { namespace detail { @@ -31,28 +29,45 @@ inline constexpr bool is_expected_v = false; template<typename V, typename E> inline constexpr bool is_expected_v<nonstd::expected<V, E>> = true; -// map implementation -template<typename Expected, typename F, typename = std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>> -auto operator|(Expected&& object, map_wrapper<F> f) { - using value_type = typename utils::remove_cvref_t<Expected>::value_type; - using error_type = typename utils::remove_cvref_t<Expected>::error_type; - if constexpr (std::is_same_v<value_type, void>) { - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>; +template<typename T> +concept expected = is_expected_v<std::remove_cvref_t<T>>; + +// from cppreference.com: The type must not be an array type, a non-object type, a specialization of std::unexpected, or a cv-qualified type. +// base template: not cv-qualified, must be an object type, not an array type +template<typename T> +inline constexpr bool is_valid_unexpected_type_v = std::is_same_v<T, std::remove_cvref_t<T>> && std::is_object_v<T> && !std::is_array_v<T>; + +// specialization: the type must not be a specialization of std::unexpected +template<typename E> +inline constexpr bool is_valid_unexpected_type_v<nonstd::unexpected_type<E>> = false; + +template<typename T> +concept valid_unexpected_type = is_valid_unexpected_type_v<T>; + + +// transform implementation +template<expected Expected, typename F> +auto operator|(Expected&& object, transform_wrapper<F> f) { + using value_type = typename std::remove_cvref_t<Expected>::value_type; + using error_type = typename std::remove_cvref_t<Expected>::error_type; + static_assert(valid_unexpected_type<error_type>, "transform expects a valid unexpected type"); + if constexpr (std::is_void_v<value_type>) { + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>; if (!object.has_value()) { return nonstd::expected<function_return_type, error_type>{nonstd::unexpect, std::forward<Expected>(object).error()}; } - if constexpr (std::is_same_v<function_return_type, void>) { + if constexpr (std::is_void_v<function_return_type>) { std::invoke(std::forward<F>(f.function)); return nonstd::expected<void, error_type>{}; } else { return nonstd::expected<function_return_type, error_type>{std::invoke(std::forward<F>(f.function))}; } } else { - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function), *std::forward<Expected>(object)))>; + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, decltype(*std::forward<Expected>(object))>>; if (!object.has_value()) { return nonstd::expected<function_return_type, error_type>{nonstd::unexpect, std::forward<Expected>(object).error()}; } - if constexpr (std::is_same_v<function_return_type, void>) { + if constexpr (std::is_void_v<function_return_type>) { std::invoke(std::forward<F>(f.function), *std::forward<Expected>(object)); return nonstd::expected<void, error_type>{}; } else { @@ -61,12 +76,12 @@ auto operator|(Expected&& object, map_wrapper<F> f) { } } -// flatMap -template<typename Expected, typename F, typename = std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>> -auto operator|(Expected&& object, flat_map_wrapper<F> f) { - using value_type = typename utils::remove_cvref_t<Expected>::value_type; - if constexpr (std::is_same_v<value_type, void>) { - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>; +// andThen +template<expected Expected, typename F> +auto operator|(Expected&& object, and_then_wrapper<F> f) { + using value_type = typename std::remove_cvref_t<Expected>::value_type; + if constexpr (std::is_void_v<value_type>) { + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>; static_assert(is_expected_v<function_return_type>, "flatMap expects a function returning expected"); if (object.has_value()) { return std::invoke(std::forward<F>(f.function)); @@ -74,7 +89,7 @@ auto operator|(Expected&& object, flat_map_wrapper<F> f) { return function_return_type{nonstd::unexpect, std::forward<Expected>(object).error()}; } } else { - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function), *std::forward<Expected>(object)))>; + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, decltype(*std::forward<Expected>(object))>>; static_assert(is_expected_v<function_return_type>, "flatMap expects a function returning expected"); if (object.has_value()) { return std::invoke(std::forward<F>(f.function), *std::forward<Expected>(object)); @@ -84,23 +99,21 @@ auto operator|(Expected&& object, flat_map_wrapper<F> f) { } } -template<typename Func, typename... Args> -using invocable_detector = decltype(std::invoke(std::declval<Func>(), std::declval<Args>()...)); - // orElse -template<typename Expected, typename F, typename = std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>> +template<expected Expected, typename F> auto operator|(Expected&& object, or_else_wrapper<F> f) { - using error_type = typename utils::remove_cvref_t<Expected>::error_type; + using error_type = typename std::remove_cvref_t<Expected>::error_type; + static_assert(valid_unexpected_type<error_type>, "orElse expects a valid unexpected type"); if (object.has_value()) { return std::forward<Expected>(object); } - constexpr bool invocable_with_argument = meta::is_detected_v<invocable_detector, F, error_type>; - if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) { - constexpr bool invocable_with_no_argument = meta::is_detected_v<invocable_detector, F>; + constexpr bool invocable_with_argument = std::invocable<F, error_type>; + if constexpr (std::is_void_v<error_type> || !invocable_with_argument) { + constexpr bool invocable_with_no_argument = std::invocable<F>; static_assert(invocable_with_no_argument); - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>; - static_assert(is_expected_v<function_return_type> || std::is_same_v<function_return_type, void>, "orElse expects a function returning expected or void"); - if constexpr (std::is_same_v<function_return_type, void>) { + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>; + static_assert(is_expected_v<function_return_type> || std::is_void_v<function_return_type>, "orElse expects a function returning expected or void"); + if constexpr (std::is_void_v<function_return_type>) { std::invoke(std::forward<F>(f.function)); return std::forward<Expected>(object); } else { @@ -108,9 +121,9 @@ auto operator|(Expected&& object, or_else_wrapper<F> f) { } } else { static_assert(invocable_with_argument); - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function), std::forward<Expected>(object).error()))>; - static_assert(is_expected_v<function_return_type> || std::is_same_v<function_return_type, void>, "orElse expects a function returning expected or void"); - if constexpr (std::is_same_v<function_return_type, void>) { + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, decltype(std::forward<Expected>(object).error())>>; + static_assert(is_expected_v<function_return_type> || std::is_void_v<function_return_type>, "orElse expects a function returning expected or void"); + if constexpr (std::is_void_v<function_return_type>) { std::invoke(std::forward<F>(f.function), std::forward<Expected>(object).error()); return std::forward<Expected>(object); } else { @@ -120,21 +133,22 @@ auto operator|(Expected&& object, or_else_wrapper<F> f) { } // valueOrElse -template<typename Expected, typename F, typename = std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>> -typename utils::remove_cvref_t<Expected>::value_type operator|(Expected&& object, value_or_else_wrapper<F> f) { - using value_type = typename utils::remove_cvref_t<Expected>::value_type; - using error_type = typename utils::remove_cvref_t<Expected>::error_type; +template<expected Expected, typename F> +typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& object, value_or_else_wrapper<F> f) { + using value_type = typename std::remove_cvref_t<Expected>::value_type; + using error_type = typename std::remove_cvref_t<Expected>::error_type; + static_assert(valid_unexpected_type<error_type>, "valueOrElse expects a valid unexpected type"); if (object.has_value()) { return *std::forward<Expected>(object); } - constexpr bool invocable_with_argument = meta::is_detected_v<invocable_detector, F, error_type>; - if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) { - constexpr bool invocable_with_no_argument = meta::is_detected_v<invocable_detector, F>; + constexpr bool invocable_with_argument = std::invocable<F, error_type>; + if constexpr (std::is_void_v<error_type> || !invocable_with_argument) { + constexpr bool invocable_with_no_argument = std::invocable<F>; static_assert(invocable_with_no_argument); - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>; - static_assert((std::is_same_v<function_return_type, void> && std::is_default_constructible_v<value_type>) || std::is_constructible_v<value_type, function_return_type>, + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>; + static_assert((std::is_void_v<function_return_type> && std::is_default_constructible_v<value_type>) || std::is_constructible_v<value_type, function_return_type>, "valueOrElse expects a function returning value_type or void"); - if constexpr (std::is_same_v<function_return_type, void>) { + if constexpr (std::is_void_v<function_return_type>) { std::invoke(std::forward<F>(f.function)); return value_type{}; } else { @@ -142,10 +156,10 @@ typename utils::remove_cvref_t<Expected>::value_type operator|(Expected&& object } } else { static_assert(invocable_with_argument); - using function_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(f.function), std::forward<Expected>(object).error()))>; - static_assert((std::is_same_v<function_return_type, void> && std::is_default_constructible_v<value_type>) || std::is_constructible_v<value_type, function_return_type>, + using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, error_type>>; + static_assert((std::is_void_v<function_return_type> && std::is_default_constructible_v<value_type>) || std::is_constructible_v<value_type, function_return_type>, "valueOrElse expects a function returning value_type or void"); - if constexpr (std::is_same_v<function_return_type, void>) { + if constexpr (std::is_void_v<function_return_type>) { std::invoke(std::forward<F>(f.function), std::forward<Expected>(object).error()); return value_type{}; } else { @@ -153,14 +167,28 @@ typename utils::remove_cvref_t<Expected>::value_type operator|(Expected&& object } } } + +// transformError +template<expected Expected, std::invocable<decltype(std::declval<Expected&&>().error())> F> +auto operator|(Expected&& object, transform_error_wrapper<F> f) { + using value_type = typename std::remove_cvref_t<Expected>::value_type; + using transformed_error_type = std::remove_cv_t<std::invoke_result_t<F, decltype(std::forward<Expected>(object).error())>>; + static_assert(valid_unexpected_type<transformed_error_type>, "transformError expects a function returning a valid unexpected type"); + using transformed_expected_type = nonstd::expected<value_type, transformed_error_type>; + if (object.has_value()) { + return transformed_expected_type{std::forward<Expected>(object)}; + } + return transformed_expected_type{nonstd::unexpect, std::invoke(std::forward<F>(f.function), std::forward<Expected>(object).error())}; +} + } // namespace detail template<typename F, typename... Args> auto try_expression(F&& action, Args&&... args) noexcept { - using action_return_type = std::decay_t<decltype(std::invoke(std::forward<F>(action), std::forward<Args>(args)...))>; + using action_return_type = std::remove_cvref_t<std::invoke_result_t<F, Args&&...>>; using return_type = nonstd::expected<action_return_type, std::exception_ptr>; try { - if constexpr (std::is_same_v<action_return_type, void>) { + if constexpr (std::is_void_v<action_return_type>) { std::invoke(std::forward<F>(action), std::forward<Args>(args)...); return return_type{}; } else { diff --git a/libminifi/src/DiskSpaceWatchdog.cpp b/libminifi/src/DiskSpaceWatchdog.cpp index fc930b588..a8770584e 100644 --- a/libminifi/src/DiskSpaceWatchdog.cpp +++ b/libminifi/src/DiskSpaceWatchdog.cpp @@ -23,10 +23,7 @@ #include "utils/file/PathUtils.h" #include "utils/TimeUtil.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { +namespace org::apache::nifi::minifi { namespace { namespace chr = std::chrono; @@ -40,14 +37,13 @@ std::optional<T> data_size_string_to_int(const std::string& str) { return std::make_optional(result); } - } // namespace namespace disk_space_watchdog { Config read_config(const Configure& conf) { - const auto interval_ms = conf.get(Configure::minifi_disk_space_watchdog_interval) | utils::flatMap(utils::timeutils::StringToDuration<chr::milliseconds>); - const auto stop_bytes = conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>); - const auto restart_bytes = conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | utils::flatMap(data_size_string_to_int<std::uintmax_t>); + const auto interval_ms = conf.get(Configure::minifi_disk_space_watchdog_interval) | utils::andThen(utils::timeutils::StringToDuration<chr::milliseconds>); + const auto stop_bytes = conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | utils::andThen(data_size_string_to_int<std::uintmax_t>); + const auto restart_bytes = conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | utils::andThen(data_size_string_to_int<std::uintmax_t>); if (restart_bytes < stop_bytes) { throw std::runtime_error{"disk space watchdog stop threshold must be <= restart threshold"}; } constexpr auto mebibytes = 1024 * 1024; return { @@ -75,7 +71,4 @@ std::vector<std::uintmax_t> check_available_space(const std::vector<std::string> } } // namespace disk_space_watchdog -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org +} // namespace org::apache::nifi::minifi diff --git a/libminifi/src/RootProcessGroupWrapper.cpp b/libminifi/src/RootProcessGroupWrapper.cpp index f36667d6c..b1108487a 100644 --- a/libminifi/src/RootProcessGroupWrapper.cpp +++ b/libminifi/src/RootProcessGroupWrapper.cpp @@ -162,9 +162,9 @@ state::StateController* RootProcessGroupWrapper::getProcessorController(const st } return utils::Identifier::parse(id_or_name) - | utils::flatMap([this](utils::Identifier id) { return utils::optional_from_ptr(root_->findProcessorById(id)); }) + | utils::andThen([this](utils::Identifier id) { return utils::optional_from_ptr(root_->findProcessorById(id)); }) | utils::orElse([this, &id_or_name] { return utils::optional_from_ptr(root_->findProcessorByName(id_or_name)); }) - | utils::map([this, &controllerFactory](gsl::not_null<core::Processor*> proc) -> gsl::not_null<state::ProcessorController*> { + | utils::transform([this, &controllerFactory](gsl::not_null<core::Processor*> proc) -> gsl::not_null<state::ProcessorController*> { return utils::optional_from_ptr(processor_to_controller_[proc->getUUID()].get()) | utils::valueOrElse([this, proc, &controllerFactory] { return gsl::make_not_null((processor_to_controller_[proc->getUUID()] = controllerFactory(*proc)).get()); diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 64464a602..c32273f44 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -631,8 +631,8 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) { for (const auto& [name, value] : resp.operation_arguments) { bool persist = ( value.getAnnotation("persist") - | utils::map(&AnnotatedValue::to_string) - | utils::flatMap(utils::StringUtils::toBool)).value_or(true); + | utils::transform(&AnnotatedValue::to_string) + | utils::andThen(utils::StringUtils::toBool)).value_or(true); PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT; changeUpdateState(update_property(name, value.to_string(), lifetime)); } @@ -969,7 +969,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { // output file std::filesystem::path file_path; - if (auto file_rel = resp.getArgument("file") | utils::map(make_path)) { + if (auto file_rel = resp.getArgument("file") | utils::transform(make_path)) { if (auto error = validateFilePath(file_rel.value())) { send_error(error.value()); return; diff --git a/libminifi/src/c2/C2MetricsPublisher.cpp b/libminifi/src/c2/C2MetricsPublisher.cpp index bc9cd8ad1..1d62d0171 100644 --- a/libminifi/src/c2/C2MetricsPublisher.cpp +++ b/libminifi/src/c2/C2MetricsPublisher.cpp @@ -166,7 +166,7 @@ std::optional<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::g std::vector<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::getHeartbeatNodes(bool include_manifest) const { gsl_Expects(configuration_); bool full_heartbeat = configuration_->get(minifi::Configuration::nifi_c2_full_heartbeat) - | utils::flatMap(utils::StringUtils::toBool) + | utils::andThen(utils::StringUtils::toBool) | utils::valueOrElse([] {return true;}); bool include = include_manifest || full_heartbeat; diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 999e43f58..0ed058f31 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -126,7 +126,7 @@ bool FlowConfiguration::persist(const std::string &configuration) { auto config_file_backup = *config_path_; config_file_backup += ".bak"; bool backup_file = (configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update) - | utils::flatMap(utils::StringUtils::toBool)).value_or(false); + | utils::andThen(utils::StringUtils::toBool)).value_or(false); if (backup_file) { if (utils::file::FileUtils::copy_file(*config_path_, config_file_backup) != 0) { diff --git a/libminifi/src/utils/crypto/EncryptionManager.cpp b/libminifi/src/utils/crypto/EncryptionManager.cpp index fb31334bf..351c53565 100644 --- a/libminifi/src/utils/crypto/EncryptionManager.cpp +++ b/libminifi/src/utils/crypto/EncryptionManager.cpp @@ -33,7 +33,7 @@ std::shared_ptr<core::logging::Logger> EncryptionManager::logger_{core::logging: std::optional<XSalsa20Cipher> EncryptionManager::createXSalsa20Cipher(const std::string &key_name) const { return readKey(key_name) - | utils::map([] (const Bytes& key) {return XSalsa20Cipher{key};}); + | utils::transform([] (const Bytes& key) {return XSalsa20Cipher{key};}); } std::optional<Aes256EcbCipher> EncryptionManager::createAes256EcbCipher(const std::string &key_name) const { @@ -61,7 +61,7 @@ std::optional<Bytes> EncryptionManager::readKey(const std::string& key_name) con bootstrap_conf.setHome(key_dir_); bootstrap_conf.loadConfigureFile(DEFAULT_NIFI_BOOTSTRAP_FILE); return bootstrap_conf.getString(key_name) - | utils::map([](const std::string &encryption_key_hex) { return utils::StringUtils::from_hex(encryption_key_hex); }); + | utils::transform([](const std::string &encryption_key_hex) { return utils::StringUtils::from_hex(encryption_key_hex); }); } bool EncryptionManager::writeKey(const std::string &key_name, const Bytes& key) const { diff --git a/libminifi/src/utils/crypto/EncryptionProvider.cpp b/libminifi/src/utils/crypto/EncryptionProvider.cpp index 01493bd00..1c3048645 100644 --- a/libminifi/src/utils/crypto/EncryptionProvider.cpp +++ b/libminifi/src/utils/crypto/EncryptionProvider.cpp @@ -26,7 +26,7 @@ constexpr const char* CONFIG_ENCRYPTION_KEY_PROPERTY_NAME = "nifi.bootstrap.sens std::optional<EncryptionProvider> EncryptionProvider::create(const std::filesystem::path& home_path) { return EncryptionManager{home_path}.createXSalsa20Cipher(CONFIG_ENCRYPTION_KEY_PROPERTY_NAME) - | utils::map([] (const XSalsa20Cipher& cipher) {return EncryptionProvider{cipher};}); + | utils::transform([] (const XSalsa20Cipher& cipher) {return EncryptionProvider{cipher};}); } } // namespace org::apache::nifi::minifi::utils::crypto diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index d48670a77..aca63326b 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -166,7 +166,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ running = false; // Stop running after this iteration, unless restart is explicitly requested bool should_encrypt_flow_config = (configuration->get(minifi::Configure::nifi_flow_configuration_encrypt) - | utils::flatMap(utils::StringUtils::toBool)).value_or(false); + | utils::andThen(utils::StringUtils::toBool)).value_or(false); std::shared_ptr<utils::file::FileSystem> filesystem; if (home_path) { diff --git a/libminifi/test/unit/ExpectedTest.cpp b/libminifi/test/unit/ExpectedTest.cpp index 26ac4dbe7..281c09469 100644 --- a/libminifi/test/unit/ExpectedTest.cpp +++ b/libminifi/test/unit/ExpectedTest.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #define EXTENSION_LIST "" +#include <memory> #include <string_view> #include "../TestBase.h" #include "../Catch.h" @@ -25,69 +26,69 @@ namespace utils = org::apache::nifi::minifi::utils; // shamelessly copied from https://github.com/TartanLlama/expected/blob/master/tests/extensions.cpp (License: CC0) -TEST_CASE("expected map", "[expected][map]") { +TEST_CASE("expected transform", "[expected][transform]") { auto mul2 = [](int a) { return a * 2; }; auto ret_void = [](int) {}; { nonstd::expected<int, int> e = 21; - auto ret = e | utils::map(mul2); + auto ret = e | utils::transform(mul2); REQUIRE(ret); REQUIRE(*ret == 42); } { const nonstd::expected<int, int> e = 21; - auto ret = e | utils::map(mul2); + auto ret = e | utils::transform(mul2); REQUIRE(ret); REQUIRE(*ret == 42); } { nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::map(mul2); + auto ret = std::move(e) | utils::transform(mul2); REQUIRE(ret); REQUIRE(*ret == 42); } { const nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::map(mul2); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::transform(mul2); // NOLINT(performance-move-const-arg) REQUIRE(ret); REQUIRE(*ret == 42); } { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::map(mul2); + auto ret = e | utils::transform(mul2); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::map(mul2); + auto ret = e | utils::transform(mul2); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::map(mul2); + auto ret = std::move(e) | utils::transform(mul2); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::map(mul2); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::transform(mul2); // NOLINT(performance-move-const-arg) REQUIRE(!ret); REQUIRE(ret.error() == 21); } { nonstd::expected<int, int> e = 21; - auto ret = e | utils::map(ret_void); + auto ret = e | utils::transform(ret_void); REQUIRE(ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -95,7 +96,7 @@ TEST_CASE("expected map", "[expected][map]") { { const nonstd::expected<int, int> e = 21; - auto ret = e | utils::map(ret_void); + auto ret = e | utils::transform(ret_void); REQUIRE(ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -103,7 +104,7 @@ TEST_CASE("expected map", "[expected][map]") { { nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::map(ret_void); + auto ret = std::move(e) | utils::transform(ret_void); REQUIRE(ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -111,7 +112,7 @@ TEST_CASE("expected map", "[expected][map]") { { const nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::map(ret_void); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::transform(ret_void); // NOLINT(performance-move-const-arg) REQUIRE(ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -119,7 +120,7 @@ TEST_CASE("expected map", "[expected][map]") { { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::map(ret_void); + auto ret = e | utils::transform(ret_void); REQUIRE(!ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -127,7 +128,7 @@ TEST_CASE("expected map", "[expected][map]") { { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::map(ret_void); + auto ret = e | utils::transform(ret_void); REQUIRE(!ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -135,7 +136,7 @@ TEST_CASE("expected map", "[expected][map]") { { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::map(ret_void); + auto ret = std::move(e) | utils::transform(ret_void); REQUIRE(!ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -143,7 +144,7 @@ TEST_CASE("expected map", "[expected][map]") { { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::map(ret_void); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::transform(ret_void); // NOLINT(performance-move-const-arg) REQUIRE(!ret); STATIC_REQUIRE( (std::is_same<decltype(ret), nonstd::expected<void, int>>::value)); @@ -153,141 +154,139 @@ TEST_CASE("expected map", "[expected][map]") { // mapping functions which return references { nonstd::expected<int, int> e(42); - auto ret = e | utils::map([](int& i) -> int& { return i; }); + auto ret = e | utils::transform([](int& i) -> int& { return i; }); REQUIRE(ret); REQUIRE(ret == 42); } } -TEST_CASE("expected flatMap", "[expected][flatMap]") { +TEST_CASE("expected andThen", "[expected][andThen]") { auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); }; auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 17); }; { nonstd::expected<int, int> e = 21; - auto ret = e | utils::flatMap(succeed); + auto ret = e | utils::andThen(succeed); REQUIRE(ret); REQUIRE(*ret == 42); } { const nonstd::expected<int, int> e = 21; - auto ret = e | utils::flatMap(succeed); + auto ret = e | utils::andThen(succeed); REQUIRE(ret); REQUIRE(*ret == 42); } { nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::flatMap(succeed); + auto ret = std::move(e) | utils::andThen(succeed); REQUIRE(ret); REQUIRE(*ret == 42); } { const nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::flatMap(succeed); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::andThen(succeed); // NOLINT(performance-move-const-arg) REQUIRE(ret); REQUIRE(*ret == 42); } { nonstd::expected<int, int> e = 21; - auto ret = e | utils::flatMap(fail); + auto ret = e | utils::andThen(fail); REQUIRE(!ret); REQUIRE(ret.error() == 17); } { const nonstd::expected<int, int> e = 21; - auto ret = e | utils::flatMap(fail); + auto ret = e | utils::andThen(fail); REQUIRE(!ret); REQUIRE(ret.error() == 17); } { nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::flatMap(fail); + auto ret = std::move(e) | utils::andThen(fail); REQUIRE(!ret); REQUIRE(ret.error() == 17); } { const nonstd::expected<int, int> e = 21; - auto ret = std::move(e) | utils::flatMap(fail); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::andThen(fail); // NOLINT(performance-move-const-arg) REQUIRE(!ret); REQUIRE(ret.error() == 17); } { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::flatMap(succeed); + auto ret = e | utils::andThen(succeed); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::flatMap(succeed); + auto ret = e | utils::andThen(succeed); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::flatMap(succeed); + auto ret = std::move(e) | utils::andThen(succeed); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::flatMap(succeed); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::andThen(succeed); // NOLINT(performance-move-const-arg) REQUIRE(!ret); REQUIRE(ret.error() == 21); } { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::flatMap(fail); + auto ret = e | utils::andThen(fail); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = e | utils::flatMap(fail); + auto ret = e | utils::andThen(fail); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::flatMap(fail); + auto ret = std::move(e) | utils::andThen(fail); REQUIRE(!ret); REQUIRE(ret.error() == 21); } { const nonstd::expected<int, int> e(nonstd::unexpect, 21); - auto ret = std::move(e) | utils::flatMap(fail); // NOLINT(performance-move-const-arg) + auto ret = std::move(e) | utils::andThen(fail); // NOLINT(performance-move-const-arg) REQUIRE(!ret); REQUIRE(ret.error() == 21); } } TEST_CASE("expected orElse", "[expected][orElse]") { - // commented out parts depend on https://github.com/martinmoene/expected-lite/pull/40 (waiting for release) - // using eptr = std::unique_ptr<int>; + using eptr = std::unique_ptr<int>; auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); }; - // auto succeedptr = [](eptr e) { return nonstd::expected<int, eptr>(21*2);}; - auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 17);}; - // auto efail = [](eptr e) { *e = 17;return nonstd::expected<int, eptr>(nonstd::unexpect, std::move(e));}; - // auto failptr = [](eptr e) { return nonstd::expected<int, eptr>(nonstd::unexpect, std::move(e));}; + auto succeedptr = [](eptr) { return nonstd::expected<int, eptr>(21*2); }; + auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 17); }; + auto efail = [](eptr e) { *e = 17; return nonstd::expected<int, eptr>(nonstd::unexpect, std::move(e)); }; auto failvoid = [](int) {}; - // auto failvoidptr = [](const eptr&) { /* don't consume */}; - // auto consumeptr = [](eptr) {}; - // auto make_u_int = [](int n) { return std::unique_ptr<int>(new int(n));}; + auto failvoidptr = [](const eptr&) { /* don't consume */}; + auto consumeptr = [](eptr) {}; + auto make_u_int = [](int n) { return std::make_unique<int>(n); }; { nonstd::expected<int, int> e = 21; @@ -310,14 +309,12 @@ TEST_CASE("expected orElse", "[expected][orElse]") { REQUIRE(*ret == 21); } - /* { nonstd::expected<int, eptr> e = 21; auto ret = std::move(e) | utils::orElse(succeedptr); REQUIRE(ret); REQUIRE(*ret == 21); } - */ { const nonstd::expected<int, int> e = 21; @@ -348,14 +345,12 @@ TEST_CASE("expected orElse", "[expected][orElse]") { } - /* { nonstd::expected<int, eptr> e = 21; auto ret = std::move(e) | utils::orElse(efail); REQUIRE(ret); REQUIRE(ret == 21); } - */ { const nonstd::expected<int, int> e = 21; @@ -385,14 +380,12 @@ TEST_CASE("expected orElse", "[expected][orElse]") { REQUIRE(*ret == 42); } - /* { nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21)); auto ret = std::move(e) | utils::orElse(succeedptr); REQUIRE(ret); REQUIRE(*ret == 42); } - */ { const nonstd::expected<int, int> e(nonstd::unexpect, 21); @@ -443,7 +436,6 @@ TEST_CASE("expected orElse", "[expected][orElse]") { REQUIRE(ret.error() == 21); } - /* { nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21)); auto ret = std::move(e) | utils::orElse(failvoidptr); @@ -457,7 +449,6 @@ TEST_CASE("expected orElse", "[expected][orElse]") { REQUIRE(!ret); REQUIRE(ret.error() == nullptr); } - */ { const nonstd::expected<int, int> e(nonstd::unexpect, 21); @@ -484,3 +475,63 @@ TEST_CASE("expected valueOrElse", "[expected][valueOrElse]") { REQUIRE_THROWS_AS(ex | utils::valueOrElse([](const std::string&) -> int { throw std::exception(); }), std::exception); REQUIRE_THROWS_AS(std::move(ex) | utils::valueOrElse([](std::string&&) -> int { throw std::exception(); }), std::exception); } + +TEST_CASE("expected transformError", "[expected][transformError]") { + auto mul2 = [](int a) { return a * 2; }; + + { + nonstd::expected<int, int> e = nonstd::make_unexpected(21); + auto ret = e | utils::transformError(mul2); + REQUIRE(!ret); + REQUIRE(ret.error() == 42); + } + + { + const nonstd::expected<int, int> e = nonstd::make_unexpected(21); + auto ret = e | utils::transformError(mul2); + REQUIRE(!ret); + REQUIRE(ret.error() == 42); + } + + { + nonstd::expected<int, int> e = nonstd::make_unexpected(21); + auto ret = std::move(e) | utils::transformError(mul2); + REQUIRE(!ret); + REQUIRE(ret.error() == 42); + } + + { + const nonstd::expected<int, int> e = nonstd::make_unexpected(21); + auto ret = std::move(e) | utils::transformError(mul2); // NOLINT(performance-move-const-arg) + REQUIRE(!ret); + REQUIRE(ret.error() == 42); + } + + { + nonstd::expected<int, int> e = 21; + auto ret = e | utils::transformError(mul2); + REQUIRE(ret); + REQUIRE(*ret == 21); + } + + { + const nonstd::expected<int, int> e = 21; + auto ret = e | utils::transformError(mul2); + REQUIRE(ret); + REQUIRE(*ret == 21); + } + + { + const auto mutable_rvalue_fn = []{ return nonstd::expected<int, int>{21}; }; + auto ret = mutable_rvalue_fn() | utils::transformError(mul2); + REQUIRE(ret); + REQUIRE(*ret == 21); + } + + { + const nonstd::expected<int, int> e = 21; + auto ret = std::move(e) | utils::transformError(mul2); // NOLINT(performance-move-const-arg) + REQUIRE(ret); + REQUIRE(*ret == 21); + } +} diff --git a/libminifi/test/unit/OptionalTest.cpp b/libminifi/test/unit/OptionalTest.cpp index 93dd31879..044694d11 100644 --- a/libminifi/test/unit/OptionalTest.cpp +++ b/libminifi/test/unit/OptionalTest.cpp @@ -22,28 +22,28 @@ namespace utils = org::apache::nifi::minifi::utils; -TEST_CASE("optional map", "[optional map]") { - const auto test1 = std::make_optional(6) | utils::map([](const int i) { return i * 2; }); +TEST_CASE("optional transform", "[optional transform]") { + const auto test1 = std::make_optional(6) | utils::transform([](const int i) { return i * 2; }); REQUIRE(12 == test1.value()); - const auto test2 = std::optional<int>{} | utils::map([](const int i) { return i * 2; }); + const auto test2 = std::optional<int>{} | utils::transform([](const int i) { return i * 2; }); REQUIRE(!test2.has_value()); } -TEST_CASE("optional flatMap", "[optional flat map]") { +TEST_CASE("optional andThen", "[optional and then]") { const auto make_intdiv_noremainder = [](const int denom) { return [denom](const int num) { return num % denom == 0 ? std::make_optional(num / denom) : std::optional<int>{}; }; }; - const auto test1 = std::make_optional(6) | utils::flatMap(make_intdiv_noremainder(3)); + const auto test1 = std::make_optional(6) | utils::andThen(make_intdiv_noremainder(3)); REQUIRE(2 == test1.value()); const auto const_lval_func = make_intdiv_noremainder(4); - const auto test2 = std::optional<int>{} | utils::flatMap(const_lval_func); + const auto test2 = std::optional<int>{} | utils::andThen(const_lval_func); REQUIRE(!test2.has_value()); auto mutable_lval_func = make_intdiv_noremainder(3); - const auto test3 = std::make_optional(7) | utils::flatMap(mutable_lval_func); + const auto test3 = std::make_optional(7) | utils::andThen(mutable_lval_func); REQUIRE(!test3.has_value()); } diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 56fa28bc4..6e34b9360 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -355,7 +355,7 @@ int main(int argc, char **argv) { writeSchemaIfRequested(argument_parser, configure); std::chrono::milliseconds stop_wait_time = configure->get(minifi::Configure::nifi_graceful_shutdown_seconds) - | utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>) + | utils::andThen(utils::timeutils::StringToDuration<std::chrono::milliseconds>) | utils::valueOrElse([] { return std::chrono::milliseconds(STOP_WAIT_TIME_MS);}); configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); @@ -400,7 +400,7 @@ int main(int argc, char **argv) { configure->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); bool should_encrypt_flow_config = (configure->get(minifi::Configure::nifi_flow_configuration_encrypt) - | utils::flatMap(utils::StringUtils::toBool)).value_or(false); + | utils::andThen(utils::StringUtils::toBool)).value_or(false); auto filesystem = std::make_shared<utils::file::FileSystem>( should_encrypt_flow_config, @@ -421,7 +421,7 @@ int main(int argc, char **argv) { prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); const bool disk_space_watchdog_enable = configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) - | utils::flatMap(utils::StringUtils::toBool) + | utils::andThen(utils::StringUtils::toBool) | utils::valueOrElse([] { return true; }); std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;