This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4780b541bb0ee5c6a4419840f02f6d64c10d72ad
Author: Marton Szasz <sza...@apache.org>
AuthorDate: Sat Sep 16 05:14:58 2023 +0200

    MINIFICPP-2218 Refactor expected monadic functions
    
    Signed-off-by: Ferenc Gerlits <fgerl...@gmail.com>
    This closes #1659
---
 CPPLINT.cfg                                        |   2 +-
 extensions/aws/processors/FetchS3Object.cpp        |   2 +-
 extensions/aws/s3/S3Wrapper.cpp                    |  10 +-
 extensions/expression-language/Expression.cpp      |   4 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |  10 +-
 .../mqtt/processors/AbstractMQTTProcessor.cpp      |   6 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  12 +-
 extensions/mqtt/processors/PublishMQTT.cpp         |   4 +-
 .../rocksdb-repos/DatabaseContentRepository.cpp    |   8 +-
 extensions/rocksdb-repos/RocksDbRepository.cpp     |   8 +-
 extensions/sftp/processors/ListSFTP.cpp            |   2 +-
 .../processors/DefragmentText.cpp                  |   2 +-
 .../standard-processors/processors/ListFile.cpp    |   2 +-
 .../standard-processors/processors/PutUDP.cpp      |   4 +-
 .../standard-processors/processors/RouteText.cpp   |   2 +-
 extensions/systemd/ConsumeJournald.cpp             |  14 +-
 .../CollectorInitiatedSubscription.cpp             |   2 +-
 libminifi/include/DiskSpaceWatchdog.h              |  16 +--
 libminifi/include/EventDrivenSchedulingAgent.h     |   2 +-
 libminifi/include/SchedulingAgent.h                |   2 +-
 .../include/core/logging/LoggerConfiguration.h     |   2 +-
 libminifi/include/properties/Decryptor.h           |   2 +-
 libminifi/include/utils/OptionalUtils.h            |  10 +-
 .../utils/detail/MonadicOperationWrappers.h        |  16 ++-
 libminifi/include/utils/expected.h                 | 124 ++++++++++-------
 libminifi/src/DiskSpaceWatchdog.cpp                |  17 +--
 libminifi/src/RootProcessGroupWrapper.cpp          |   4 +-
 libminifi/src/c2/C2Agent.cpp                       |   6 +-
 libminifi/src/c2/C2MetricsPublisher.cpp            |   2 +-
 libminifi/src/core/FlowConfiguration.cpp           |   2 +-
 libminifi/src/utils/crypto/EncryptionManager.cpp   |   4 +-
 libminifi/src/utils/crypto/EncryptionProvider.cpp  |   2 +-
 libminifi/test/integration/IntegrationBase.h       |   2 +-
 libminifi/test/unit/ExpectedTest.cpp               | 155 ++++++++++++++-------
 libminifi/test/unit/OptionalTest.cpp               |  14 +-
 minifi_main/MiNiFiMain.cpp                         |   6 +-
 36 files changed, 277 insertions(+), 205 deletions(-)

diff --git a/CPPLINT.cfg b/CPPLINT.cfg
index 30b47e390..77f9a737d 100644
--- a/CPPLINT.cfg
+++ b/CPPLINT.cfg
@@ -1,2 +1,2 @@
 set noparent
-filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon,-build/namespaces_literals,-readability/check
+filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon,-build/namespaces_literals,-readability/check,-build/include_what_you_use
diff --git a/extensions/aws/processors/FetchS3Object.cpp 
b/extensions/aws/processors/FetchS3Object.cpp
index 311af7cf8..a2bc62687 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -86,7 +86,7 @@ void FetchS3Object::onTrigger(const 
std::shared_ptr<core::ProcessContext> &conte
   std::optional<minifi::aws::s3::GetObjectResult> result;
   session->write(flow_file, [&get_object_params, &result, this](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
     result = s3_wrapper_.getObject(*get_object_params, *stream);
-    return (result | 
minifi::utils::map(&s3::GetObjectResult::write_size)).value_or(0);
+    return (result | 
minifi::utils::transform(&s3::GetObjectResult::write_size)).value_or(0);
   });
 
   if (result) {
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index 29c180f0d..059332233 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -204,16 +204,16 @@ std::optional<PutObjectResult> 
S3Wrapper::putObjectMultipart(const PutObjectRequ
   if (auto upload_state = getMultipartUploadState(put_object_params)) {
     logger_->log_info("Found previous multipart upload state for %s in bucket 
%s, continuing upload", put_object_params.object_key, put_object_params.bucket);
     return uploadParts(put_object_params, stream, std::move(*upload_state))
-      | minifi::utils::flatMap([&, this](const auto& upload_parts_result) { 
return completeMultipartUpload(put_object_params, upload_parts_result); })
-      | minifi::utils::map([this](const auto& 
complete_multipart_upload_result) { return 
createPutObjectResult(complete_multipart_upload_result); });
+      | minifi::utils::andThen([&, this](const auto& upload_parts_result) { 
return completeMultipartUpload(put_object_params, upload_parts_result); })
+      | minifi::utils::transform([this](const auto& 
complete_multipart_upload_result) { return 
createPutObjectResult(complete_multipart_upload_result); });
   } else {
     logger_->log_debug("No previous multipart upload state was found for %s in 
bucket %s", put_object_params.object_key, put_object_params.bucket);
     auto request = 
createPutObjectRequest<Aws::S3::Model::CreateMultipartUploadRequest>(put_object_params);
     return request_sender_->sendCreateMultipartUploadRequest(request, 
put_object_params.credentials, put_object_params.client_config, 
put_object_params.use_virtual_addressing)
-      | minifi::utils::flatMap([&, this](const auto& create_multipart_result) 
{ return uploadParts(put_object_params, stream,
+      | minifi::utils::andThen([&, this](const auto& create_multipart_result) 
{ return uploadParts(put_object_params, stream,
           MultipartUploadState{create_multipart_result.GetUploadId(), 
multipart_size, flow_size, Aws::Utils::DateTime::Now()}); })
-      | minifi::utils::flatMap([&, this](const auto& upload_parts_result) { 
return completeMultipartUpload(put_object_params, upload_parts_result); })
-      | minifi::utils::map([this](const auto& 
complete_multipart_upload_result) { return 
createPutObjectResult(complete_multipart_upload_result); });
+      | minifi::utils::andThen([&, this](const auto& upload_parts_result) { 
return completeMultipartUpload(put_object_params, upload_parts_result); })
+      | minifi::utils::transform([this](const auto& 
complete_multipart_upload_result) { return 
createPutObjectResult(complete_multipart_upload_result); });
   }
 }
 
diff --git a/extensions/expression-language/Expression.cpp 
b/extensions/expression-language/Expression.cpp
index bde9039fb..dffab1134 100644
--- a/extensions/expression-language/Expression.cpp
+++ b/extensions/expression-language/Expression.cpp
@@ -198,8 +198,8 @@ Value expr_reverseDnsLookup(const std::vector<Value>& args) 
{
   }
 
   return utils::net::addressFromString(ip_address_str)
-      | utils::flatMap([timeout_duration](const auto& ip_address) { return 
utils::net::reverseDnsLookup(ip_address, timeout_duration);})
-      | utils::map([](const auto& hostname)-> Value { return Value(hostname); 
})
+      | utils::andThen([timeout_duration](const auto& ip_address) { return 
utils::net::reverseDnsLookup(ip_address, timeout_duration);})
+      | utils::transform([](const auto& hostname)-> Value { return 
Value(hostname); })
       | utils::valueOrElse([&](std::error_code error_code) {
         if (error_code.value() == asio::error::timed_out) {
           
core::logging::LoggerFactory<Expression>::getLogger()->log_warn("reverseDnsLookup
 timed out");
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp 
b/extensions/http-curl/processors/InvokeHTTP.cpp
index f9696e425..549b03add 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -67,7 +67,7 @@ void setupClientProxy(extensions::curl::HTTPClient& client, 
const core::ProcessC
 }
 
 void setupClientPeerVerification(extensions::curl::HTTPClient& client, const 
core::ProcessContext& context) {
-  if (auto disable_peer_verification = 
context.getProperty(InvokeHTTP::DisablePeerVerification) | 
utils::flatMap(&utils::StringUtils::toBool)) {
+  if (auto disable_peer_verification = 
context.getProperty(InvokeHTTP::DisablePeerVerification) | 
utils::andThen(&utils::StringUtils::toBool)) {
     client.setPeerVerification(!*disable_peer_verification);
   }
 }
@@ -99,11 +99,11 @@ void InvokeHTTP::setupMembersFromProperties(const 
core::ProcessContext& context)
 
   attributes_to_send_ = context.getProperty(AttributesToSend)
                         | utils::filter([](const std::string& s) { return 
!s.empty(); })  // avoid compiling an empty string to regex
-                        | utils::map([](const std::string& regex_str) { return 
utils::Regex{regex_str}; })
+                        | utils::transform([](const std::string& regex_str) { 
return utils::Regex{regex_str}; })
                         | utils::orElse([this] { logger_->log_debug("%s is 
missing, so the default value will be used", 
std::string{AttributesToSend.name}); });
 
-  always_output_response_ = (context.getProperty(AlwaysOutputResponse) | 
utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
-  penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | 
utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
+  always_output_response_ = (context.getProperty(AlwaysOutputResponse) | 
utils::andThen(&utils::StringUtils::toBool)).value_or(false);
+  penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | 
utils::andThen(&utils::StringUtils::toBool)).value_or(false);
 
   invalid_http_header_field_handling_strategy_ = 
utils::parseEnumProperty<invoke_http::InvalidHTTPHeaderFieldHandlingOption>(context,
 InvalidHTTPHeaderFieldHandlingStrategy);
 
@@ -113,7 +113,7 @@ void InvokeHTTP::setupMembersFromProperties(const 
core::ProcessContext& context)
     put_response_body_in_attribute_.reset();
   }
 
-  use_chunked_encoding_ = (context.getProperty(UseChunkedEncoding) | 
utils::flatMap(&utils::StringUtils::toBool)).value_or(false);
+  use_chunked_encoding_ = (context.getProperty(UseChunkedEncoding) | 
utils::andThen(&utils::StringUtils::toBool)).value_or(false);
   send_date_header_ = context.getProperty<bool>(DateHeader).value_or(true);
 }
 
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp 
b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 06ecf3d3e..4722ccd71 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -51,12 +51,12 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContex
   }
   logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
 
-  if (const auto keep_alive_interval = context->getProperty(KeepAliveInterval) 
| utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (const auto keep_alive_interval = context->getProperty(KeepAliveInterval) 
| utils::andThen(&core::TimePeriodValue::fromString)) {
     keep_alive_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
   }
   logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] 
s", int64_t{keep_alive_interval_.count()});
 
-  if (const auto connection_timeout = context->getProperty(ConnectionTimeout) 
| utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (const auto connection_timeout = context->getProperty(ConnectionTimeout) 
| utils::andThen(&core::TimePeriodValue::fromString)) {
     connection_timeout_ = 
std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
   }
   logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] 
s", int64_t{connection_timeout_.count()});
@@ -107,7 +107,7 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContex
     logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%u]", 
static_cast<uint8_t>(last_will_qos_));
     last_will_->qos = static_cast<int>(last_will_qos_);
 
-    if (const auto value = context->getProperty(LastWillRetain) | 
utils::flatMap(&utils::StringUtils::toBool)) {
+    if (const auto value = context->getProperty(LastWillRetain) | 
utils::andThen(&utils::StringUtils::toBool)) {
       logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", 
*value);
       last_will_retain_ = {*value};
       last_will_->retained = last_will_retain_;
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp 
b/extensions/mqtt/processors/ConsumeMQTT.cpp
index 54d497063..686fccd55 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -51,22 +51,22 @@ void ConsumeMQTT::readProperties(const 
std::shared_ptr<core::ProcessContext>& co
   }
   logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
 
-  if (const auto value = context->getProperty(CleanSession) | 
utils::flatMap(&utils::StringUtils::toBool)) {
+  if (const auto value = context->getProperty(CleanSession) | 
utils::andThen(&utils::StringUtils::toBool)) {
     clean_session_ = *value;
   }
   logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
 
-  if (const auto value = context->getProperty(CleanStart) | 
utils::flatMap(&utils::StringUtils::toBool)) {
+  if (const auto value = context->getProperty(CleanStart) | 
utils::andThen(&utils::StringUtils::toBool)) {
     clean_start_ = *value;
   }
   logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
 
-  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval) | 
utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval) | 
utils::andThen(&core::TimePeriodValue::fromString)) {
     session_expiry_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
   }
   logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
-  if (const auto value = context->getProperty(QueueBufferMaxMessage) | 
utils::flatMap(&utils::toNumber<uint64_t>)) {
+  if (const auto value = context->getProperty(QueueBufferMaxMessage) | 
utils::andThen(&utils::toNumber<uint64_t>)) {
     max_queue_size_ = *value;
   }
   logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
@@ -76,12 +76,12 @@ void ConsumeMQTT::readProperties(const 
std::shared_ptr<core::ProcessContext>& co
   }
   logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-  if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum) 
| utils::flatMap(&utils::toNumber<uint32_t>)) {
+  if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum) 
| utils::andThen(&utils::toNumber<uint32_t>)) {
     topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
   }
   logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (const auto receive_maximum = context->getProperty(ReceiveMaximum) | 
utils::flatMap(&utils::toNumber<uint32_t>)) {
+  if (const auto receive_maximum = context->getProperty(ReceiveMaximum) | 
utils::andThen(&utils::toNumber<uint32_t>)) {
     receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
   logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp 
b/extensions/mqtt/processors/PublishMQTT.cpp
index 9b2591524..bb5a538d0 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -47,12 +47,12 @@ void PublishMQTT::readProperties(const 
std::shared_ptr<core::ProcessContext>& co
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "PublishMQTT: Topic is 
required");
   }
 
-  if (const auto retain_opt = context->getProperty(Retain) | 
utils::flatMap(&utils::StringUtils::toBool)) {
+  if (const auto retain_opt = context->getProperty(Retain) | 
utils::andThen(&utils::StringUtils::toBool)) {
     retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval) | 
utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval) | 
utils::andThen(&core::TimePeriodValue::fromString)) {
     message_expiry_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(message_expiry_interval->getMilliseconds());
     logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
   }
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp 
b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index e89243a90..e013ae09d 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -328,14 +328,14 @@ void DatabaseContentRepository::clearOrphans() {
 
 uint64_t DatabaseContentRepository::getRepositorySize() const {
   return (utils::optional_from_ptr(db_.get()) |
-          utils::flatMap([](const auto& db) { return db->open(); }) |
-          utils::flatMap([](const auto& opendb) { return 
opendb.getApproximateSizes(); })).value_or(0);
+          utils::andThen([](const auto& db) { return db->open(); }) |
+          utils::andThen([](const auto& opendb) { return 
opendb.getApproximateSizes(); })).value_or(0);
 }
 
 uint64_t DatabaseContentRepository::getRepositoryEntryCount() const {
   return (utils::optional_from_ptr(db_.get()) |
-          utils::flatMap([](const auto& db) { return db->open(); }) |
-          utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> {
+          utils::andThen([](const auto& db) { return db->open(); }) |
+          utils::andThen([](auto&& opendb) -> std::optional<uint64_t> {
               std::string key_count;
               opendb.GetProperty("rocksdb.estimate-num-keys", &key_count);
               if (!key_count.empty()) {
diff --git a/extensions/rocksdb-repos/RocksDbRepository.cpp 
b/extensions/rocksdb-repos/RocksDbRepository.cpp
index 6de9fe1a7..4dc420c62 100644
--- a/extensions/rocksdb-repos/RocksDbRepository.cpp
+++ b/extensions/rocksdb-repos/RocksDbRepository.cpp
@@ -84,14 +84,14 @@ bool RocksDbRepository::Get(const std::string &key, 
std::string &value) {
 
 uint64_t RocksDbRepository::getRepositorySize() const {
   return (utils::optional_from_ptr(db_.get()) |
-          utils::flatMap([](const auto& db) { return db->open(); }) |
-          utils::flatMap([](const auto& opendb) { return 
opendb.getApproximateSizes(); })).value_or(0);
+          utils::andThen([](const auto& db) { return db->open(); }) |
+          utils::andThen([](const auto& opendb) { return 
opendb.getApproximateSizes(); })).value_or(0);
 }
 
 uint64_t RocksDbRepository::getRepositoryEntryCount() const {
   return (utils::optional_from_ptr(db_.get()) |
-          utils::flatMap([](const auto& db) { return db->open(); }) |
-          utils::flatMap([](auto&& opendb) -> std::optional<uint64_t> {
+          utils::andThen([](const auto& db) { return db->open(); }) |
+          utils::andThen([](auto&& opendb) -> std::optional<uint64_t> {
               std::string key_count;
               opendb.GetProperty("rocksdb.estimate-num-keys", &key_count);
               if (!key_count.empty()) {
diff --git a/extensions/sftp/processors/ListSFTP.cpp 
b/extensions/sftp/processors/ListSFTP.cpp
index 9bddb34a3..606f7711e 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -125,7 +125,7 @@ void ListSFTP::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
     logger_->log_error("Minimum File Age attribute is missing or invalid");
   }
 
-  if (auto maximum_file_age = context->getProperty(MaximumFileAge) | 
utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (auto maximum_file_age = context->getProperty(MaximumFileAge) | 
utils::andThen(&core::TimePeriodValue::fromString)) {
     maximum_file_age_ = maximum_file_age->getMilliseconds();
   } else {
     logger_->log_error("Maximum File Age attribute is missing or invalid");
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp 
b/extensions/standard-processors/processors/DefragmentText.cpp
index 7fbbe76d3..0a04770fd 100644
--- a/extensions/standard-processors/processors/DefragmentText.cpp
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -41,7 +41,7 @@ void DefragmentText::initialize() {
 void DefragmentText::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory*) {
   gsl_Expects(context);
 
-  if (auto max_buffer_age = context->getProperty(MaxBufferAge) | 
utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (auto max_buffer_age = context->getProperty(MaxBufferAge) | 
utils::andThen(&core::TimePeriodValue::fromString)) {
     max_age_ = max_buffer_age->getMilliseconds();
     setTriggerWhenEmpty(true);
     logger_->log_trace("The Buffer maximum age is configured to be %" PRId64 " 
ms", int64_t{max_buffer_age->getMilliseconds().count()});
diff --git a/extensions/standard-processors/processors/ListFile.cpp 
b/extensions/standard-processors/processors/ListFile.cpp
index ee22a0781..724eade4e 100644
--- a/extensions/standard-processors/processors/ListFile.cpp
+++ b/extensions/standard-processors/processors/ListFile.cpp
@@ -59,7 +59,7 @@ void ListFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
     minimum_file_age_ =  minimum_file_age->getMilliseconds();
   }
 
-  if (auto maximum_file_age = context->getProperty(MaximumFileAge) | 
utils::flatMap(&core::TimePeriodValue::fromString)) {
+  if (auto maximum_file_age = context->getProperty(MaximumFileAge) | 
utils::andThen(&core::TimePeriodValue::fromString)) {
     maximum_file_age_ =  maximum_file_age->getMilliseconds();
   }
 
diff --git a/extensions/standard-processors/processors/PutUDP.cpp 
b/extensions/standard-processors/processors/PutUDP.cpp
index 19b6899e4..d3702abb1 100644
--- a/extensions/standard-processors/processors/PutUDP.cpp
+++ b/extensions/standard-processors/processors/PutUDP.cpp
@@ -124,8 +124,8 @@ void PutUDP::onTrigger(core::ProcessContext* context, 
core::ProcessSession* cons
   };
 
   resolve_hostname()
-      | utils::flatMap(send_data_to_endpoint)
-      | utils::map(transfer_to_success)
+      | utils::andThen(send_data_to_endpoint)
+      | utils::transform(transfer_to_success)
       | utils::orElse(transfer_to_failure);
 }
 
diff --git a/extensions/standard-processors/processors/RouteText.cpp 
b/extensions/standard-processors/processors/RouteText.cpp
index 6092e0530..8d7007302 100644
--- a/extensions/standard-processors/processors/RouteText.cpp
+++ b/extensions/standard-processors/processors/RouteText.cpp
@@ -50,7 +50,7 @@ void RouteText::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFa
   matching_ = utils::parseEnumProperty<route_text::Matching>(*context, 
MatchingStrategy);
   context->getProperty(TrimWhitespace, trim_);
   case_policy_ = context->getProperty<bool>(IgnoreCase).value_or(false) ? 
route_text::CasePolicy::IGNORE_CASE : route_text::CasePolicy::CASE_SENSITIVE;
-  group_regex_ = context->getProperty(GroupingRegex) | utils::map([] (const 
auto& str) {return utils::Regex(str);});
+  group_regex_ = context->getProperty(GroupingRegex) | utils::transform([] 
(const auto& str) {return utils::Regex(str);});
   segmentation_ = utils::parseEnumProperty<route_text::Segmentation>(*context, 
SegmentationStrategy);
   context->getProperty(GroupingFallbackValue, group_fallback_);
 }
diff --git a/extensions/systemd/ConsumeJournald.cpp 
b/extensions/systemd/ConsumeJournald.cpp
index ae23b90f0..7655a91e2 100644
--- a/extensions/systemd/ConsumeJournald.cpp
+++ b/extensions/systemd/ConsumeJournald.cpp
@@ -68,11 +68,11 @@ void ConsumeJournald::onSchedule(core::ProcessContext* 
const context, core::Proc
     return std::nullopt;
   };
   batch_size_ = context->getProperty<size_t>(BatchSize).value();
-  payload_format_ = (context->getProperty(PayloadFormat) | 
utils::flatMap(parse_payload_format)
+  payload_format_ = (context->getProperty(PayloadFormat) | 
utils::andThen(parse_payload_format)
       | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, 
"invalid payload format"}; }))
       .value();
   include_timestamp_ = context->getProperty<bool>(IncludeTimestamp).value();
-  const auto journal_type = (context->getProperty(JournalType) | 
utils::flatMap(parse_journal_type)
+  const auto journal_type = (context->getProperty(JournalType) | 
utils::andThen(parse_journal_type)
       | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, 
"invalid journal type"}; }))
       .value();
   const auto process_old_messages = 
context->getProperty<bool>(ProcessOldMessages).value_or(false);
@@ -93,7 +93,7 @@ void ConsumeJournald::onSchedule(core::ProcessContext* const 
context, core::Proc
     return process_old_messages ? journal.seekHead() : journal.seekTail();
   };
   worker_->enqueue([this, &seek_default] {
-    const auto cursor = state_manager_->get() | 
utils::map([](std::unordered_map<std::string, std::string>&& m) { return 
m.at(CURSOR_KEY); });
+    const auto cursor = state_manager_->get() | 
utils::transform([](std::unordered_map<std::string, std::string>&& m) { return 
m.at(CURSOR_KEY); });
     if (!cursor) {
       seek_default(*journal_);
     } else {
@@ -147,7 +147,7 @@ std::optional<std::span<const char>> 
ConsumeJournald::enumerateJournalEntry(libw
 }
 
 std::optional<ConsumeJournald::journal_field> 
ConsumeJournald::getNextField(libwrapper::Journal& journal) {
-  return enumerateJournalEntry(journal) | utils::map([](std::span<const char> 
field) {
+  return enumerateJournalEntry(journal) | utils::transform([](std::span<const 
char> field) {
     const auto eq_pos = std::find(std::begin(field), std::end(field), '=');
     gsl_Ensures(eq_pos != std::end(field) && "field string must contain an 
equals sign");
     const auto eq_idx = gsl::narrow<size_t>(eq_pos - std::begin(field));
@@ -205,12 +205,12 @@ std::string ConsumeJournald::formatSyslogMessage(const 
journal_message& msg) con
 
   const auto pid_string = utils::optional_from_ptr(syslog_pid)
       | utils::orElse([&] { return utils::optional_from_ptr(systemd_pid); })
-      | utils::map([](const std::string* const pid) { return 
fmt::format("[{}]", *pid); });
+      | utils::transform([](const std::string* const pid) { return 
fmt::format("[{}]", *pid); });
 
   return fmt::format("{} {} {}{}: {}",
       date::format(timestamp_format_, 
chr::floor<chr::microseconds>(msg.timestamp)),
-      (utils::optional_from_ptr(systemd_hostname) | 
utils::map(utils::dereference)).value_or("unknown_host"),
-      (utils::optional_from_ptr(syslog_identifier) | 
utils::map(utils::dereference)).value_or("unknown_process"),
+      (utils::optional_from_ptr(systemd_hostname) | 
utils::transform(utils::dereference)).value_or("unknown_host"),
+      (utils::optional_from_ptr(syslog_identifier) | 
utils::transform(utils::dereference)).value_or("unknown_process"),
       pid_string.value_or(std::string{}),
       *message);
 }
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp 
b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index afe7b85d6..036fbcb8b 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -110,7 +110,7 @@ void CollectorInitiatedSubscription::onTrigger(const 
std::shared_ptr<core::Proce
   if (flowFileCount > 0) {
     lastActivityTimestamp_ = now;
   } else if (auto inactive_duration_to_reconnect_ms = 
context->getProperty<core::TimePeriodValue>(InactiveDurationToReconnect)
-             | utils::map([](const auto& time_period_value) { return 
time_period_value.getMilliseconds().count(); });
+             | utils::transform([](const auto& time_period_value) { return 
time_period_value.getMilliseconds().count(); });
              inactive_duration_to_reconnect_ms && 
*inactive_duration_to_reconnect_ms > 0) {
     if ((now - lastActivityTimestamp_) > *inactive_duration_to_reconnect_ms) {
       logger_->log_info("Exceeds configured 'inactive duration to reconnect' 
%lld ms. Unsubscribe to reconnect..", *inactive_duration_to_reconnect_ms);
diff --git a/libminifi/include/DiskSpaceWatchdog.h 
b/libminifi/include/DiskSpaceWatchdog.h
index 384a9fe2e..93895b15c 100644
--- a/libminifi/include/DiskSpaceWatchdog.h
+++ b/libminifi/include/DiskSpaceWatchdog.h
@@ -25,17 +25,12 @@
 
 #include "utils/IntervalSwitch.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 class Configure;
-namespace core {
-namespace logging {
+namespace core::logging {
 class Logger;
-}  // namespace logging
-}  // namespace core
+}  // namespace core::logging
 
 namespace disk_space_watchdog {
 struct Config {
@@ -54,7 +49,4 @@ inline utils::IntervalSwitch<std::uintmax_t> 
disk_space_interval_switch(Config c
 std::vector<std::uintmax_t> check_available_space(const 
std::vector<std::string>& paths, core::logging::Logger* logger = nullptr);
 
 }  // namespace disk_space_watchdog
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/EventDrivenSchedulingAgent.h 
b/libminifi/include/EventDrivenSchedulingAgent.h
index f8664cc25..352212179 100644
--- a/libminifi/include/EventDrivenSchedulingAgent.h
+++ b/libminifi/include/EventDrivenSchedulingAgent.h
@@ -42,7 +42,7 @@ class EventDrivenSchedulingAgent : public 
ThreadedSchedulingAgent {
     using namespace std::literals::chrono_literals;
 
     time_slice_ = 
configuration->get(Configure::nifi_flow_engine_event_driven_time_slice)
-        | 
utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>)
+        | 
utils::andThen(utils::timeutils::StringToDuration<std::chrono::milliseconds>)
         | utils::valueOrElse([] { return DEFAULT_TIME_SLICE; });
 
     if (time_slice_ < 10ms || 1000ms < time_slice_) {
diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index 4911f5e90..b24b74d97 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -66,7 +66,7 @@ class SchedulingAgent {
     flow_repo_ = flow_repo;
 
     alert_time_ = configuration->get(Configure::nifi_flow_engine_alert_period)
-        | 
utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>)
+        | 
utils::andThen(utils::timeutils::StringToDuration<std::chrono::milliseconds>)
         | utils::valueOrElse([] { return 
SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD; });
 
     if (alert_time_ > std::chrono::milliseconds(0)) {
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h 
b/libminifi/include/core/logging/LoggerConfiguration.h
index c62ee74b4..a34d68bd0 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -62,7 +62,7 @@ struct LoggerNamespace {
 };
 
 inline std::optional<std::string> formatId(std::optional<utils::Identifier> 
opt_id) {
-  return opt_id | utils::map([](auto id) { return " (" + 
std::string(id.to_string()) + ")"; });
+  return opt_id | utils::transform([](auto id) { return " (" + 
std::string(id.to_string()) + ")"; });
 }
 
 inline constexpr std::string_view UNLIMITED_LOG_ENTRY_LENGTH = "unlimited";
diff --git a/libminifi/include/properties/Decryptor.h 
b/libminifi/include/properties/Decryptor.h
index 359e20738..822d338e0 100644
--- a/libminifi/include/properties/Decryptor.h
+++ b/libminifi/include/properties/Decryptor.h
@@ -43,7 +43,7 @@ class Decryptor {
 
   static std::optional<Decryptor> create(const std::filesystem::path& 
minifi_home) {
     return utils::crypto::EncryptionProvider::create(minifi_home)
-        | utils::map([](const utils::crypto::EncryptionProvider& provider) 
{return Decryptor{provider};});
+        | utils::transform([](const utils::crypto::EncryptionProvider& 
provider) {return Decryptor{provider};});
   }
 
  private:
diff --git a/libminifi/include/utils/OptionalUtils.h 
b/libminifi/include/utils/OptionalUtils.h
index 9f0b92acd..b11d42b65 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -41,9 +41,9 @@ template<typename T>
 struct is_optional<std::optional<T>, void> : std::true_type {};
 
 namespace detail {
-// map implementation
+// transform implementation
 template<typename SourceType, typename F>
-auto operator|(std::optional<SourceType> o, map_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *std::move(o)))) {
+auto operator|(std::optional<SourceType> o, transform_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *std::move(o)))) {
   using cb_result = std::decay_t<std::invoke_result_t<F, SourceType>>;
   if constexpr(std::is_same_v<cb_result, void>) {
     if (o.has_value()) {
@@ -59,9 +59,9 @@ auto operator|(std::optional<SourceType> o, map_wrapper<F> f) 
noexcept(noexcept(
   }
 }
 
-// flatMap implementation
+// andThen implementation
 template<typename SourceType, typename F>
-auto operator|(const std::optional<SourceType>& o, flat_map_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
+auto operator|(const std::optional<SourceType>& o, and_then_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), *o)))
     -> std::optional<typename 
std::decay<decltype(*std::invoke(std::forward<F>(f.function), *o))>::type> {
   static_assert(is_optional<decltype(std::invoke(std::forward<F>(f.function), 
*o))>::value, "flatMap expects a function returning optional");
   if (o.has_value()) {
@@ -71,7 +71,7 @@ auto operator|(const std::optional<SourceType>& o, 
flat_map_wrapper<F> f) noexce
   }
 }
 template<typename SourceType, typename F>
-auto operator|(std::optional<SourceType>&& o, flat_map_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), std::move(*o))))
+auto operator|(std::optional<SourceType>&& o, and_then_wrapper<F> f) 
noexcept(noexcept(std::invoke(std::forward<F>(f.function), std::move(*o))))
     -> std::optional<typename 
std::decay<decltype(*std::invoke(std::forward<F>(f.function), 
std::move(*o)))>::type> {
   static_assert(is_optional<decltype(std::invoke(std::forward<F>(f.function), 
std::move(*o)))>::value, "flatMap expects a function returning optional");
   if (o.has_value()) {
diff --git a/libminifi/include/utils/detail/MonadicOperationWrappers.h 
b/libminifi/include/utils/detail/MonadicOperationWrappers.h
index f8e088621..e973f90fe 100644
--- a/libminifi/include/utils/detail/MonadicOperationWrappers.h
+++ b/libminifi/include/utils/detail/MonadicOperationWrappers.h
@@ -21,12 +21,12 @@
 namespace org::apache::nifi::minifi::utils {
 namespace detail {
 template<typename T>
-struct map_wrapper {
+struct transform_wrapper {
   T function;
 };
 
 template<typename T>
-struct flat_map_wrapper {
+struct and_then_wrapper {
   T function;
 };
 
@@ -44,6 +44,11 @@ template<typename T>
 struct filter_wrapper {
   T function;
 };
+
+template<typename T>
+struct transform_error_wrapper {
+  T function;
+};
 }  // namespace detail
 
 /**
@@ -53,7 +58,7 @@ struct filter_wrapper {
  * @return Wrapped result of func
  */
 template<typename T>
-detail::map_wrapper<T&&> map(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+detail::transform_wrapper<T&&> transform(T&& func) noexcept { return 
{std::forward<T>(func)}; }
 
 /**
  * Transforms a wrapped value of T by calling the provided function T -> 
wrapped U, returning wrapped U.
@@ -61,7 +66,7 @@ detail::map_wrapper<T&&> map(T&& func) noexcept { return 
{std::forward<T>(func)}
  * @return Transformed value
  */
 template<typename T>
-detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+detail::and_then_wrapper<T&&> andThen(T&& func) noexcept { return 
{std::forward<T>(func)}; }
 
 /**
  * For optional-like types, possibly provides a value for an empty object to 
be replaced with.
@@ -89,4 +94,7 @@ detail::value_or_else_wrapper<T&&> valueOrElse(T&& func) 
noexcept { return {std:
  */
 template<typename T>
 detail::filter_wrapper<T&&> filter(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+
+template<typename T>
+detail::transform_error_wrapper<T&&> transformError(T&& func) noexcept { 
return {std::forward<T>(func)}; }
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/expected.h 
b/libminifi/include/utils/expected.h
index c3bdce50f..f4aa742d6 100644
--- a/libminifi/include/utils/expected.h
+++ b/libminifi/include/utils/expected.h
@@ -19,8 +19,6 @@
 #include <utility>
 #include "nonstd/expected.hpp"
 #include "utils/detail/MonadicOperationWrappers.h"
-#include "utils/GeneralUtils.h"
-#include "utils/meta/detected.h"
 
 namespace org::apache::nifi::minifi::utils {
 namespace detail {
@@ -31,28 +29,45 @@ inline constexpr bool is_expected_v = false;
 template<typename V, typename E>
 inline constexpr bool is_expected_v<nonstd::expected<V, E>> = true;
 
-// map implementation
-template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
-auto operator|(Expected&& object, map_wrapper<F> f) {
-  using value_type = typename utils::remove_cvref_t<Expected>::value_type;
-  using error_type = typename utils::remove_cvref_t<Expected>::error_type;
-  if constexpr (std::is_same_v<value_type, void>) {
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+template<typename T>
+concept expected = is_expected_v<std::remove_cvref_t<T>>;
+
+// from cppreference.com: The type must not be an array type, a non-object 
type, a specialization of std::unexpected, or a cv-qualified type.
+// base template: not cv-qualified, must be an object type, not an array type
+template<typename T>
+inline constexpr bool is_valid_unexpected_type_v = std::is_same_v<T, 
std::remove_cvref_t<T>> && std::is_object_v<T> && !std::is_array_v<T>;
+
+// specialization: the type must not be a specialization of std::unexpected
+template<typename E>
+inline constexpr bool is_valid_unexpected_type_v<nonstd::unexpected_type<E>> = 
false;
+
+template<typename T>
+concept valid_unexpected_type = is_valid_unexpected_type_v<T>;
+
+
+// transform implementation
+template<expected Expected, typename F>
+auto operator|(Expected&& object, transform_wrapper<F> f) {
+  using value_type = typename std::remove_cvref_t<Expected>::value_type;
+  using error_type = typename std::remove_cvref_t<Expected>::error_type;
+  static_assert(valid_unexpected_type<error_type>, "transform expects a valid 
unexpected type");
+  if constexpr (std::is_void_v<value_type>) {
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>;
     if (!object.has_value()) {
       return nonstd::expected<function_return_type, 
error_type>{nonstd::unexpect, std::forward<Expected>(object).error()};
     }
-    if constexpr (std::is_same_v<function_return_type, void>) {
+    if constexpr (std::is_void_v<function_return_type>) {
       std::invoke(std::forward<F>(f.function));
       return nonstd::expected<void, error_type>{};
     } else {
       return nonstd::expected<function_return_type, 
error_type>{std::invoke(std::forward<F>(f.function))};
     }
   } else {
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object)))>;
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, 
decltype(*std::forward<Expected>(object))>>;
     if (!object.has_value()) {
       return nonstd::expected<function_return_type, 
error_type>{nonstd::unexpect, std::forward<Expected>(object).error()};
     }
-    if constexpr (std::is_same_v<function_return_type, void>) {
+    if constexpr (std::is_void_v<function_return_type>) {
       std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object));
       return nonstd::expected<void, error_type>{};
     } else {
@@ -61,12 +76,12 @@ auto operator|(Expected&& object, map_wrapper<F> f) {
   }
 }
 
-// flatMap
-template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
-auto operator|(Expected&& object, flat_map_wrapper<F> f) {
-  using value_type = typename utils::remove_cvref_t<Expected>::value_type;
-  if constexpr (std::is_same_v<value_type, void>) {
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
+// andThen
+template<expected Expected, typename F>
+auto operator|(Expected&& object, and_then_wrapper<F> f) {
+  using value_type = typename std::remove_cvref_t<Expected>::value_type;
+  if constexpr (std::is_void_v<value_type>) {
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>;
     static_assert(is_expected_v<function_return_type>, "flatMap expects a 
function returning expected");
     if (object.has_value()) {
       return std::invoke(std::forward<F>(f.function));
@@ -74,7 +89,7 @@ auto operator|(Expected&& object, flat_map_wrapper<F> f) {
       return function_return_type{nonstd::unexpect, 
std::forward<Expected>(object).error()};
     }
   } else {
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object)))>;
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, 
decltype(*std::forward<Expected>(object))>>;
     static_assert(is_expected_v<function_return_type>, "flatMap expects a 
function returning expected");
     if (object.has_value()) {
       return std::invoke(std::forward<F>(f.function), 
*std::forward<Expected>(object));
@@ -84,23 +99,21 @@ auto operator|(Expected&& object, flat_map_wrapper<F> f) {
   }
 }
 
-template<typename Func, typename... Args>
-using invocable_detector = decltype(std::invoke(std::declval<Func>(), 
std::declval<Args>()...));
-
 // orElse
-template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
+template<expected Expected, typename F>
 auto operator|(Expected&& object, or_else_wrapper<F> f) {
-  using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+  using error_type = typename std::remove_cvref_t<Expected>::error_type;
+  static_assert(valid_unexpected_type<error_type>, "orElse expects a valid 
unexpected type");
   if (object.has_value()) {
     return std::forward<Expected>(object);
   }
-  constexpr bool invocable_with_argument = 
meta::is_detected_v<invocable_detector, F, error_type>;
-  if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) {
-    constexpr bool invocable_with_no_argument = 
meta::is_detected_v<invocable_detector, F>;
+  constexpr bool invocable_with_argument = std::invocable<F, error_type>;
+  if constexpr (std::is_void_v<error_type> || !invocable_with_argument) {
+    constexpr bool invocable_with_no_argument = std::invocable<F>;
     static_assert(invocable_with_no_argument);
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
-    static_assert(is_expected_v<function_return_type> || 
std::is_same_v<function_return_type, void>, "orElse expects a function 
returning expected or void");
-    if constexpr (std::is_same_v<function_return_type, void>) {
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>;
+    static_assert(is_expected_v<function_return_type> || 
std::is_void_v<function_return_type>, "orElse expects a function returning 
expected or void");
+    if constexpr (std::is_void_v<function_return_type>) {
       std::invoke(std::forward<F>(f.function));
       return std::forward<Expected>(object);
     } else {
@@ -108,9 +121,9 @@ auto operator|(Expected&& object, or_else_wrapper<F> f) {
     }
   } else {
     static_assert(invocable_with_argument);
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error()))>;
-    static_assert(is_expected_v<function_return_type> || 
std::is_same_v<function_return_type, void>, "orElse expects a function 
returning expected or void");
-    if constexpr (std::is_same_v<function_return_type, void>) {
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, 
decltype(std::forward<Expected>(object).error())>>;
+    static_assert(is_expected_v<function_return_type> || 
std::is_void_v<function_return_type>, "orElse expects a function returning 
expected or void");
+    if constexpr (std::is_void_v<function_return_type>) {
       std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error());
       return std::forward<Expected>(object);
     } else {
@@ -120,21 +133,22 @@ auto operator|(Expected&& object, or_else_wrapper<F> f) {
 }
 
 // valueOrElse
-template<typename Expected, typename F, typename = 
std::enable_if_t<is_expected_v<utils::remove_cvref_t<Expected>>>>
-typename utils::remove_cvref_t<Expected>::value_type operator|(Expected&& 
object, value_or_else_wrapper<F> f) {
-  using value_type = typename utils::remove_cvref_t<Expected>::value_type;
-  using error_type = typename utils::remove_cvref_t<Expected>::error_type;
+template<expected Expected, typename F>
+typename std::remove_cvref_t<Expected>::value_type operator|(Expected&& 
object, value_or_else_wrapper<F> f) {
+  using value_type = typename std::remove_cvref_t<Expected>::value_type;
+  using error_type = typename std::remove_cvref_t<Expected>::error_type;
+  static_assert(valid_unexpected_type<error_type>, "valueOrElse expects a 
valid unexpected type");
   if (object.has_value()) {
     return *std::forward<Expected>(object);
   }
-  constexpr bool invocable_with_argument = 
meta::is_detected_v<invocable_detector, F, error_type>;
-  if constexpr (std::is_same_v<error_type, void> || !invocable_with_argument) {
-    constexpr bool invocable_with_no_argument = 
meta::is_detected_v<invocable_detector, F>;
+  constexpr bool invocable_with_argument = std::invocable<F, error_type>;
+  if constexpr (std::is_void_v<error_type> || !invocable_with_argument) {
+    constexpr bool invocable_with_no_argument = std::invocable<F>;
     static_assert(invocable_with_no_argument);
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function)))>;
-    static_assert((std::is_same_v<function_return_type, void> && 
std::is_default_constructible_v<value_type>) || 
std::is_constructible_v<value_type, function_return_type>,
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F>>;
+    static_assert((std::is_void_v<function_return_type> && 
std::is_default_constructible_v<value_type>) || 
std::is_constructible_v<value_type, function_return_type>,
         "valueOrElse expects a function returning value_type or void");
-    if constexpr (std::is_same_v<function_return_type, void>) {
+    if constexpr (std::is_void_v<function_return_type>) {
       std::invoke(std::forward<F>(f.function));
       return value_type{};
     } else {
@@ -142,10 +156,10 @@ typename utils::remove_cvref_t<Expected>::value_type 
operator|(Expected&& object
     }
   } else {
     static_assert(invocable_with_argument);
-    using function_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error()))>;
-    static_assert((std::is_same_v<function_return_type, void> && 
std::is_default_constructible_v<value_type>) || 
std::is_constructible_v<value_type, function_return_type>,
+    using function_return_type = std::remove_cvref_t<std::invoke_result_t<F, 
error_type>>;
+    static_assert((std::is_void_v<function_return_type> && 
std::is_default_constructible_v<value_type>) || 
std::is_constructible_v<value_type, function_return_type>,
         "valueOrElse expects a function returning value_type or void");
-    if constexpr (std::is_same_v<function_return_type, void>) {
+    if constexpr (std::is_void_v<function_return_type>) {
       std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error());
       return value_type{};
     } else {
@@ -153,14 +167,28 @@ typename utils::remove_cvref_t<Expected>::value_type 
operator|(Expected&& object
     }
   }
 }
+
+// transformError
+template<expected Expected, 
std::invocable<decltype(std::declval<Expected&&>().error())> F>
+auto operator|(Expected&& object, transform_error_wrapper<F> f) {
+  using value_type = typename std::remove_cvref_t<Expected>::value_type;
+  using transformed_error_type = std::remove_cv_t<std::invoke_result_t<F, 
decltype(std::forward<Expected>(object).error())>>;
+  static_assert(valid_unexpected_type<transformed_error_type>, "transformError 
expects a function returning a valid unexpected type");
+  using transformed_expected_type = nonstd::expected<value_type, 
transformed_error_type>;
+  if (object.has_value()) {
+    return transformed_expected_type{std::forward<Expected>(object)};
+  }
+  return transformed_expected_type{nonstd::unexpect, 
std::invoke(std::forward<F>(f.function), 
std::forward<Expected>(object).error())};
+}
+
 }  // namespace detail
 
 template<typename F, typename... Args>
 auto try_expression(F&& action, Args&&... args) noexcept {
-  using action_return_type = 
std::decay_t<decltype(std::invoke(std::forward<F>(action), 
std::forward<Args>(args)...))>;
+  using action_return_type = std::remove_cvref_t<std::invoke_result_t<F, 
Args&&...>>;
   using return_type = nonstd::expected<action_return_type, std::exception_ptr>;
   try {
-    if constexpr (std::is_same_v<action_return_type, void>) {
+    if constexpr (std::is_void_v<action_return_type>) {
       std::invoke(std::forward<F>(action), std::forward<Args>(args)...);
       return return_type{};
     } else {
diff --git a/libminifi/src/DiskSpaceWatchdog.cpp 
b/libminifi/src/DiskSpaceWatchdog.cpp
index fc930b588..a8770584e 100644
--- a/libminifi/src/DiskSpaceWatchdog.cpp
+++ b/libminifi/src/DiskSpaceWatchdog.cpp
@@ -23,10 +23,7 @@
 #include "utils/file/PathUtils.h"
 #include "utils/TimeUtil.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
 
 namespace {
 namespace chr = std::chrono;
@@ -40,14 +37,13 @@ std::optional<T> data_size_string_to_int(const std::string& 
str) {
   return std::make_optional(result);
 }
 
-
 }  // namespace
 
 namespace disk_space_watchdog {
 Config read_config(const Configure& conf) {
-  const auto interval_ms = 
conf.get(Configure::minifi_disk_space_watchdog_interval) | 
utils::flatMap(utils::timeutils::StringToDuration<chr::milliseconds>);
-  const auto stop_bytes = 
conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | 
utils::flatMap(data_size_string_to_int<std::uintmax_t>);
-  const auto restart_bytes = 
conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | 
utils::flatMap(data_size_string_to_int<std::uintmax_t>);
+  const auto interval_ms = 
conf.get(Configure::minifi_disk_space_watchdog_interval) | 
utils::andThen(utils::timeutils::StringToDuration<chr::milliseconds>);
+  const auto stop_bytes = 
conf.get(Configure::minifi_disk_space_watchdog_stop_threshold) | 
utils::andThen(data_size_string_to_int<std::uintmax_t>);
+  const auto restart_bytes = 
conf.get(Configure::minifi_disk_space_watchdog_restart_threshold) | 
utils::andThen(data_size_string_to_int<std::uintmax_t>);
   if (restart_bytes < stop_bytes) { throw std::runtime_error{"disk space 
watchdog stop threshold must be <= restart threshold"}; }
   constexpr auto mebibytes = 1024 * 1024;
   return {
@@ -75,7 +71,4 @@ std::vector<std::uintmax_t> check_available_space(const 
std::vector<std::string>
 }
 
 }  // namespace disk_space_watchdog
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/RootProcessGroupWrapper.cpp 
b/libminifi/src/RootProcessGroupWrapper.cpp
index f36667d6c..b1108487a 100644
--- a/libminifi/src/RootProcessGroupWrapper.cpp
+++ b/libminifi/src/RootProcessGroupWrapper.cpp
@@ -162,9 +162,9 @@ state::StateController* 
RootProcessGroupWrapper::getProcessorController(const st
   }
 
   return utils::Identifier::parse(id_or_name)
-    | utils::flatMap([this](utils::Identifier id) { return 
utils::optional_from_ptr(root_->findProcessorById(id)); })
+    | utils::andThen([this](utils::Identifier id) { return 
utils::optional_from_ptr(root_->findProcessorById(id)); })
     | utils::orElse([this, &id_or_name] { return 
utils::optional_from_ptr(root_->findProcessorByName(id_or_name)); })
-    | utils::map([this, &controllerFactory](gsl::not_null<core::Processor*> 
proc) -> gsl::not_null<state::ProcessorController*> {
+    | utils::transform([this, 
&controllerFactory](gsl::not_null<core::Processor*> proc) -> 
gsl::not_null<state::ProcessorController*> {
       return 
utils::optional_from_ptr(processor_to_controller_[proc->getUUID()].get())
           | utils::valueOrElse([this, proc, &controllerFactory] {
             return 
gsl::make_not_null((processor_to_controller_[proc->getUUID()] = 
controllerFactory(*proc)).get());
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 64464a602..c32273f44 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -631,8 +631,8 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse 
&resp) {
   for (const auto& [name, value] : resp.operation_arguments) {
     bool persist = (
         value.getAnnotation("persist")
-        | utils::map(&AnnotatedValue::to_string)
-        | utils::flatMap(utils::StringUtils::toBool)).value_or(true);
+        | utils::transform(&AnnotatedValue::to_string)
+        | utils::andThen(utils::StringUtils::toBool)).value_or(true);
     PropertyChangeLifetime lifetime = persist ? 
PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT;
     changeUpdateState(update_property(name, value.to_string(), lifetime));
   }
@@ -969,7 +969,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& 
resp) {
 
   // output file
   std::filesystem::path file_path;
-  if (auto file_rel = resp.getArgument("file") | utils::map(make_path)) {
+  if (auto file_rel = resp.getArgument("file") | utils::transform(make_path)) {
     if (auto error = validateFilePath(file_rel.value())) {
       send_error(error.value());
       return;
diff --git a/libminifi/src/c2/C2MetricsPublisher.cpp 
b/libminifi/src/c2/C2MetricsPublisher.cpp
index bc9cd8ad1..1d62d0171 100644
--- a/libminifi/src/c2/C2MetricsPublisher.cpp
+++ b/libminifi/src/c2/C2MetricsPublisher.cpp
@@ -166,7 +166,7 @@ std::optional<state::response::NodeReporter::ReportedNode> 
C2MetricsPublisher::g
 std::vector<state::response::NodeReporter::ReportedNode> 
C2MetricsPublisher::getHeartbeatNodes(bool include_manifest) const {
   gsl_Expects(configuration_);
   bool full_heartbeat = 
configuration_->get(minifi::Configuration::nifi_c2_full_heartbeat)
-      | utils::flatMap(utils::StringUtils::toBool)
+      | utils::andThen(utils::StringUtils::toBool)
       | utils::valueOrElse([] {return true;});
 
   bool include = include_manifest || full_heartbeat;
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index 999e43f58..0ed058f31 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -126,7 +126,7 @@ bool FlowConfiguration::persist(const std::string 
&configuration) {
   auto config_file_backup = *config_path_;
   config_file_backup += ".bak";
   bool backup_file = 
(configuration_->get(minifi::Configure::nifi_flow_configuration_file_backup_update)
-                      | 
utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+                      | 
utils::andThen(utils::StringUtils::toBool)).value_or(false);
 
   if (backup_file) {
     if (utils::file::FileUtils::copy_file(*config_path_, config_file_backup) 
!= 0) {
diff --git a/libminifi/src/utils/crypto/EncryptionManager.cpp 
b/libminifi/src/utils/crypto/EncryptionManager.cpp
index fb31334bf..351c53565 100644
--- a/libminifi/src/utils/crypto/EncryptionManager.cpp
+++ b/libminifi/src/utils/crypto/EncryptionManager.cpp
@@ -33,7 +33,7 @@ std::shared_ptr<core::logging::Logger> 
EncryptionManager::logger_{core::logging:
 
 std::optional<XSalsa20Cipher> EncryptionManager::createXSalsa20Cipher(const 
std::string &key_name) const {
   return readKey(key_name)
-         | utils::map([] (const Bytes& key) {return XSalsa20Cipher{key};});
+         | utils::transform([] (const Bytes& key) {return 
XSalsa20Cipher{key};});
 }
 
 std::optional<Aes256EcbCipher> EncryptionManager::createAes256EcbCipher(const 
std::string &key_name) const {
@@ -61,7 +61,7 @@ std::optional<Bytes> EncryptionManager::readKey(const 
std::string& key_name) con
   bootstrap_conf.setHome(key_dir_);
   bootstrap_conf.loadConfigureFile(DEFAULT_NIFI_BOOTSTRAP_FILE);
   return bootstrap_conf.getString(key_name)
-         | utils::map([](const std::string &encryption_key_hex) { return 
utils::StringUtils::from_hex(encryption_key_hex); });
+         | utils::transform([](const std::string &encryption_key_hex) { return 
utils::StringUtils::from_hex(encryption_key_hex); });
 }
 
 bool EncryptionManager::writeKey(const std::string &key_name, const Bytes& 
key) const {
diff --git a/libminifi/src/utils/crypto/EncryptionProvider.cpp 
b/libminifi/src/utils/crypto/EncryptionProvider.cpp
index 01493bd00..1c3048645 100644
--- a/libminifi/src/utils/crypto/EncryptionProvider.cpp
+++ b/libminifi/src/utils/crypto/EncryptionProvider.cpp
@@ -26,7 +26,7 @@ constexpr const char* CONFIG_ENCRYPTION_KEY_PROPERTY_NAME = 
"nifi.bootstrap.sens
 
 std::optional<EncryptionProvider> EncryptionProvider::create(const 
std::filesystem::path& home_path) {
   return 
EncryptionManager{home_path}.createXSalsa20Cipher(CONFIG_ENCRYPTION_KEY_PROPERTY_NAME)
-    | utils::map([] (const XSalsa20Cipher& cipher) {return 
EncryptionProvider{cipher};});
+    | utils::transform([] (const XSalsa20Cipher& cipher) {return 
EncryptionProvider{cipher};});
 }
 
 }  // namespace org::apache::nifi::minifi::utils::crypto
diff --git a/libminifi/test/integration/IntegrationBase.h 
b/libminifi/test/integration/IntegrationBase.h
index d48670a77..aca63326b 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -166,7 +166,7 @@ void IntegrationBase::run(const 
std::optional<std::filesystem::path>& test_file_
     running = false;  // Stop running after this iteration, unless restart is 
explicitly requested
 
     bool should_encrypt_flow_config = 
(configuration->get(minifi::Configure::nifi_flow_configuration_encrypt)
-        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+        | utils::andThen(utils::StringUtils::toBool)).value_or(false);
 
     std::shared_ptr<utils::file::FileSystem> filesystem;
     if (home_path) {
diff --git a/libminifi/test/unit/ExpectedTest.cpp 
b/libminifi/test/unit/ExpectedTest.cpp
index 26ac4dbe7..281c09469 100644
--- a/libminifi/test/unit/ExpectedTest.cpp
+++ b/libminifi/test/unit/ExpectedTest.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #define EXTENSION_LIST ""
+#include <memory>
 #include <string_view>
 #include "../TestBase.h"
 #include "../Catch.h"
@@ -25,69 +26,69 @@
 namespace utils = org::apache::nifi::minifi::utils;
 
 // shamelessly copied from 
https://github.com/TartanLlama/expected/blob/master/tests/extensions.cpp 
(License: CC0)
-TEST_CASE("expected map", "[expected][map]") {
+TEST_CASE("expected transform", "[expected][transform]") {
   auto mul2 = [](int a) { return a * 2; };
   auto ret_void = [](int) {};
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::map(mul2);
+    auto ret = e | utils::transform(mul2);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::map(mul2);
+    auto ret = e | utils::transform(mul2);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::map(mul2);
+    auto ret = std::move(e) | utils::transform(mul2);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::map(mul2);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::transform(mul2);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::map(mul2);
+    auto ret = e | utils::transform(mul2);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::map(mul2);
+    auto ret = e | utils::transform(mul2);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::map(mul2);
+    auto ret = std::move(e) | utils::transform(mul2);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::map(mul2);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::transform(mul2);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::map(ret_void);
+    auto ret = e | utils::transform(ret_void);
     REQUIRE(ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -95,7 +96,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::map(ret_void);
+    auto ret = e | utils::transform(ret_void);
     REQUIRE(ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -103,7 +104,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::map(ret_void);
+    auto ret = std::move(e) | utils::transform(ret_void);
     REQUIRE(ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -111,7 +112,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::map(ret_void);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::transform(ret_void);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -119,7 +120,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::map(ret_void);
+    auto ret = e | utils::transform(ret_void);
     REQUIRE(!ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -127,7 +128,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::map(ret_void);
+    auto ret = e | utils::transform(ret_void);
     REQUIRE(!ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -135,7 +136,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::map(ret_void);
+    auto ret = std::move(e) | utils::transform(ret_void);
     REQUIRE(!ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -143,7 +144,7 @@ TEST_CASE("expected map", "[expected][map]") {
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::map(ret_void);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::transform(ret_void);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(!ret);
     STATIC_REQUIRE(
         (std::is_same<decltype(ret), nonstd::expected<void, int>>::value));
@@ -153,141 +154,139 @@ TEST_CASE("expected map", "[expected][map]") {
   // mapping functions which return references
   {
     nonstd::expected<int, int> e(42);
-    auto ret = e | utils::map([](int& i) -> int& { return i; });
+    auto ret = e | utils::transform([](int& i) -> int& { return i; });
     REQUIRE(ret);
     REQUIRE(ret == 42);
   }
 }
 
-TEST_CASE("expected flatMap", "[expected][flatMap]") {
+TEST_CASE("expected andThen", "[expected][andThen]") {
   auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); };
   auto fail = [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 
17); };
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::flatMap(succeed);
+    auto ret = e | utils::andThen(succeed);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::flatMap(succeed);
+    auto ret = e | utils::andThen(succeed);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::flatMap(succeed);
+    auto ret = std::move(e) | utils::andThen(succeed);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::flatMap(succeed);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::andThen(succeed);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::flatMap(fail);
+    auto ret = e | utils::andThen(fail);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 17);
   }
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = e | utils::flatMap(fail);
+    auto ret = e | utils::andThen(fail);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 17);
   }
 
   {
     nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::flatMap(fail);
+    auto ret = std::move(e) | utils::andThen(fail);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 17);
   }
 
   {
     const nonstd::expected<int, int> e = 21;
-    auto ret = std::move(e) | utils::flatMap(fail);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::andThen(fail);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(!ret);
     REQUIRE(ret.error() == 17);
   }
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::flatMap(succeed);
+    auto ret = e | utils::andThen(succeed);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::flatMap(succeed);
+    auto ret = e | utils::andThen(succeed);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::flatMap(succeed);
+    auto ret = std::move(e) | utils::andThen(succeed);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::flatMap(succeed);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::andThen(succeed);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::flatMap(fail);
+    auto ret = e | utils::andThen(fail);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = e | utils::flatMap(fail);
+    auto ret = e | utils::andThen(fail);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::flatMap(fail);
+    auto ret = std::move(e) | utils::andThen(fail);
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
-    auto ret = std::move(e) | utils::flatMap(fail);  // 
NOLINT(performance-move-const-arg)
+    auto ret = std::move(e) | utils::andThen(fail);  // 
NOLINT(performance-move-const-arg)
     REQUIRE(!ret);
     REQUIRE(ret.error() == 21);
   }
 }
 
 TEST_CASE("expected orElse", "[expected][orElse]") {
-  // commented out parts depend on 
https://github.com/martinmoene/expected-lite/pull/40 (waiting for release)
-  // using eptr = std::unique_ptr<int>;
+  using eptr = std::unique_ptr<int>;
   auto succeed = [](int) { return nonstd::expected<int, int>(21 * 2); };
-  // auto succeedptr = [](eptr e) { return nonstd::expected<int, eptr>(21*2);};
-  auto fail =    [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 
17);};
-  // auto efail =   [](eptr e) { *e = 17;return nonstd::expected<int, 
eptr>(nonstd::unexpect, std::move(e));};
-  // auto failptr = [](eptr e) { return nonstd::expected<int, 
eptr>(nonstd::unexpect, std::move(e));};
+  auto succeedptr = [](eptr) { return nonstd::expected<int, eptr>(21*2); };
+  auto fail =    [](int) { return nonstd::expected<int, int>(nonstd::unexpect, 
17); };
+  auto efail =   [](eptr e) { *e = 17; return nonstd::expected<int, 
eptr>(nonstd::unexpect, std::move(e)); };
   auto failvoid = [](int) {};
-  // auto failvoidptr = [](const eptr&) { /* don't consume */};
-  // auto consumeptr = [](eptr) {};
-  // auto make_u_int = [](int n) { return std::unique_ptr<int>(new int(n));};
+  auto failvoidptr = [](const eptr&) { /* don't consume */};
+  auto consumeptr = [](eptr) {};
+  auto make_u_int = [](int n) { return std::make_unique<int>(n); };
 
   {
     nonstd::expected<int, int> e = 21;
@@ -310,14 +309,12 @@ TEST_CASE("expected orElse", "[expected][orElse]") {
     REQUIRE(*ret == 21);
   }
 
-  /*
   {
     nonstd::expected<int, eptr> e = 21;
     auto ret = std::move(e) | utils::orElse(succeedptr);
     REQUIRE(ret);
     REQUIRE(*ret == 21);
   }
-   */
 
   {
     const nonstd::expected<int, int> e = 21;
@@ -348,14 +345,12 @@ TEST_CASE("expected orElse", "[expected][orElse]") {
   }
 
 
-  /*
   {
     nonstd::expected<int, eptr> e = 21;
     auto ret = std::move(e) | utils::orElse(efail);
     REQUIRE(ret);
     REQUIRE(ret == 21);
   }
-   */
 
   {
     const nonstd::expected<int, int> e = 21;
@@ -385,14 +380,12 @@ TEST_CASE("expected orElse", "[expected][orElse]") {
     REQUIRE(*ret == 42);
   }
 
-  /*
   {
     nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
     auto ret = std::move(e) | utils::orElse(succeedptr);
     REQUIRE(ret);
     REQUIRE(*ret == 42);
   }
-   */
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
@@ -443,7 +436,6 @@ TEST_CASE("expected orElse", "[expected][orElse]") {
     REQUIRE(ret.error() == 21);
   }
 
-  /*
   {
     nonstd::expected<int, eptr> e(nonstd::unexpect, make_u_int(21));
     auto ret = std::move(e) | utils::orElse(failvoidptr);
@@ -457,7 +449,6 @@ TEST_CASE("expected orElse", "[expected][orElse]") {
     REQUIRE(!ret);
     REQUIRE(ret.error() == nullptr);
   }
-   */
 
   {
     const nonstd::expected<int, int> e(nonstd::unexpect, 21);
@@ -484,3 +475,63 @@ TEST_CASE("expected valueOrElse", 
"[expected][valueOrElse]") {
   REQUIRE_THROWS_AS(ex | utils::valueOrElse([](const std::string&) -> int { 
throw std::exception(); }), std::exception);
   REQUIRE_THROWS_AS(std::move(ex) | utils::valueOrElse([](std::string&&) -> 
int { throw std::exception(); }), std::exception);
 }
+
+TEST_CASE("expected transformError", "[expected][transformError]") {
+  auto mul2 = [](int a) { return a * 2; };
+
+  {
+    nonstd::expected<int, int> e = nonstd::make_unexpected(21);
+    auto ret = e | utils::transformError(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e = nonstd::make_unexpected(21);
+    auto ret = e | utils::transformError(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e = nonstd::make_unexpected(21);
+    auto ret = std::move(e) | utils::transformError(mul2);
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 42);
+  }
+
+  {
+    const nonstd::expected<int, int> e = nonstd::make_unexpected(21);
+    auto ret = std::move(e) | utils::transformError(mul2);  // 
NOLINT(performance-move-const-arg)
+    REQUIRE(!ret);
+    REQUIRE(ret.error() == 42);
+  }
+
+  {
+    nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::transformError(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = e | utils::transformError(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    const auto mutable_rvalue_fn = []{ return nonstd::expected<int, int>{21}; 
};
+    auto ret = mutable_rvalue_fn() | utils::transformError(mul2);
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+
+  {
+    const nonstd::expected<int, int> e = 21;
+    auto ret = std::move(e) | utils::transformError(mul2);  // 
NOLINT(performance-move-const-arg)
+    REQUIRE(ret);
+    REQUIRE(*ret == 21);
+  }
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp 
b/libminifi/test/unit/OptionalTest.cpp
index 93dd31879..044694d11 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -22,28 +22,28 @@
 
 namespace utils = org::apache::nifi::minifi::utils;
 
-TEST_CASE("optional map", "[optional map]") {
-  const auto test1 = std::make_optional(6) | utils::map([](const int i) { 
return i * 2; });
+TEST_CASE("optional transform", "[optional transform]") {
+  const auto test1 = std::make_optional(6) | utils::transform([](const int i) 
{ return i * 2; });
   REQUIRE(12 == test1.value());
 
-  const auto test2 = std::optional<int>{} | utils::map([](const int i) { 
return i * 2; });
+  const auto test2 = std::optional<int>{} | utils::transform([](const int i) { 
return i * 2; });
   REQUIRE(!test2.has_value());
 }
 
-TEST_CASE("optional flatMap", "[optional flat map]") {
+TEST_CASE("optional andThen", "[optional and then]") {
   const auto make_intdiv_noremainder = [](const int denom) {
     return [denom](const int num) { return num % denom == 0 ? 
std::make_optional(num / denom) : std::optional<int>{}; };
   };
 
-  const auto test1 = std::make_optional(6) | 
utils::flatMap(make_intdiv_noremainder(3));
+  const auto test1 = std::make_optional(6) | 
utils::andThen(make_intdiv_noremainder(3));
   REQUIRE(2 == test1.value());
 
   const auto const_lval_func = make_intdiv_noremainder(4);
-  const auto test2 = std::optional<int>{} | utils::flatMap(const_lval_func);
+  const auto test2 = std::optional<int>{} | utils::andThen(const_lval_func);
   REQUIRE(!test2.has_value());
 
   auto mutable_lval_func = make_intdiv_noremainder(3);
-  const auto test3 = std::make_optional(7) | utils::flatMap(mutable_lval_func);
+  const auto test3 = std::make_optional(7) | utils::andThen(mutable_lval_func);
   REQUIRE(!test3.has_value());
 }
 
diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp
index 56fa28bc4..6e34b9360 100644
--- a/minifi_main/MiNiFiMain.cpp
+++ b/minifi_main/MiNiFiMain.cpp
@@ -355,7 +355,7 @@ int main(int argc, char **argv) {
     writeSchemaIfRequested(argument_parser, configure);
 
     std::chrono::milliseconds stop_wait_time = 
configure->get(minifi::Configure::nifi_graceful_shutdown_seconds)
-        | 
utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>)
+        | 
utils::andThen(utils::timeutils::StringToDuration<std::chrono::milliseconds>)
         | utils::valueOrElse([] { return 
std::chrono::milliseconds(STOP_WAIT_TIME_MS);});
 
     configure->get(minifi::Configure::nifi_provenance_repository_class_name, 
prov_repo_class);
@@ -400,7 +400,7 @@ int main(int argc, char **argv) {
     configure->get(minifi::Configure::nifi_configuration_class_name, 
nifi_configuration_class_name);
 
     bool should_encrypt_flow_config = 
(configure->get(minifi::Configure::nifi_flow_configuration_encrypt)
-        | utils::flatMap(utils::StringUtils::toBool)).value_or(false);
+        | utils::andThen(utils::StringUtils::toBool)).value_or(false);
 
     auto filesystem = std::make_shared<utils::file::FileSystem>(
         should_encrypt_flow_config,
@@ -421,7 +421,7 @@ int main(int argc, char **argv) {
       prov_repo, flow_repo, configure, std::move(flow_configuration), 
content_repo, std::move(metrics_publisher_store), filesystem, request_restart);
 
     const bool disk_space_watchdog_enable = 
configure->get(minifi::Configure::minifi_disk_space_watchdog_enable)
-        | utils::flatMap(utils::StringUtils::toBool)
+        | utils::andThen(utils::StringUtils::toBool)
         | utils::valueOrElse([] { return true; });
 
     std::unique_ptr<utils::CallBackTimer> disk_space_watchdog;

Reply via email to