This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 542924e677200c67cc764878151b53f95ac357f9
Author: Martin Zink <martinz...@apache.org>
AuthorDate: Mon Apr 15 18:25:07 2024 +0200

    MINIFICPP-2323 ListenTCP custom delimiter support
    
    Closes #1753
    
    Signed-off-by: Marton Szasz <sza...@apache.org>
---
 PROCESSORS.md                                      |  4 +-
 .../processors/ListenSyslog.cpp                    |  2 +-
 .../standard-processors/processors/ListenTCP.cpp   | 10 ++-
 .../standard-processors/processors/ListenTCP.h     | 24 +++++--
 .../processors/NetworkListenerProcessor.cpp        |  8 ++-
 .../processors/NetworkListenerProcessor.h          | 10 ++-
 .../tests/unit/ListenSyslogTests.cpp               | 14 ++--
 .../tests/unit/ListenTcpTests.cpp                  | 75 ++++++++++++----------
 .../standard-processors/tests/unit/PutTCPTests.cpp |  6 +-
 libminifi/include/utils/IntegrationTestUtils.h     |  9 +++
 libminifi/include/utils/StringUtils.h              |  2 +
 libminifi/include/utils/net/TcpServer.h            | 11 +++-
 libminifi/src/utils/StringUtils.cpp                | 59 ++++++++++++++++-
 libminifi/src/utils/net/TcpServer.cpp              |  5 +-
 libminifi/test/Utils.h                             |  5 +-
 libminifi/test/unit/StringUtilsTests.cpp           | 19 +++++-
 16 files changed, 198 insertions(+), 65 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index a11083896..1d1f5eea7 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -1548,7 +1548,7 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 ### Description
 
-Listens for incoming TCP connections and reads data from each connection using 
a line separator as the message demarcator. For each message the processor 
produces a single FlowFile.
+Listens for incoming TCP connections and reads data from each connection using 
a configurable message delimiter. For each message the processor produces a 
single FlowFile.
 
 ### Properties
 
@@ -1561,6 +1561,8 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | **Max Size of Message Queue** | 10000         |                            | 
Maximum number of messages allowed to be buffered before processing them when 
the processor is triggered. If the buffer is full, the message is ignored. If 
set to zero the buffer is unlimited. |
 | SSL Context Service           |               |                            | 
The Controller Service to use in order to obtain an SSL Context. If this 
property is set, messages will be received over a secure connection.            
                                        |
 | Client Auth                   | NONE          | NONE<br/>WANT<br/>REQUIRED | 
The client authentication policy to use for the SSL Context. Only used if an 
SSL Context Service is provided.                                                
                                    |
+| **Message Delimiter**         | \n            |                            | 
The delimiter is used to divide the stream into flowfiles.                      
                                                                                
                                 |
+| **Consume Delimiter**         | true          | true<br/>false             | 
If set to true then the delimiter won't be included at the end of the resulting 
flowfiles.                                                                      
                                 |
 
 ### Relationships
 
diff --git a/extensions/standard-processors/processors/ListenSyslog.cpp 
b/extensions/standard-processors/processors/ListenSyslog.cpp
index 8f4dc846a..3bfc7e4ed 100644
--- a/extensions/standard-processors/processors/ListenSyslog.cpp
+++ b/extensions/standard-processors/processors/ListenSyslog.cpp
@@ -54,7 +54,7 @@ void ListenSyslog::onSchedule(core::ProcessContext& context, 
core::ProcessSessio
   auto protocol = utils::enumCast<utils::net::IpProtocol>(protocol_name);
 
   if (protocol == utils::net::IpProtocol::TCP) {
-    startTcpServer(context, SSLContextService, ClientAuth);
+    startTcpServer(context, SSLContextService, ClientAuth, true, "\n");
   } else if (protocol == utils::net::IpProtocol::UDP) {
     startUdpServer(context);
   } else {
diff --git a/extensions/standard-processors/processors/ListenTCP.cpp 
b/extensions/standard-processors/processors/ListenTCP.cpp
index 7cbeb90d5..0394008be 100644
--- a/extensions/standard-processors/processors/ListenTCP.cpp
+++ b/extensions/standard-processors/processors/ListenTCP.cpp
@@ -28,7 +28,15 @@ void ListenTCP::initialize() {
 }
 
 void ListenTCP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
-  startTcpServer(context, SSLContextService, ClientAuth);
+  auto delimiter_str = context.getProperty(MessageDelimiter).value_or("\n");
+  delimiter_str = utils::string::replaceEscapedCharacters(delimiter_str);
+  if (delimiter_str.empty()) {
+    logger_->log_warn("{} cannot be an empty string, using \\n as the 
delimiter", MessageDelimiter.name);
+    delimiter_str = "\n";
+  }
+
+  const auto consume_delimiter = 
context.getProperty<bool>(ConsumeDelimiter).value_or(true);
+  startTcpServer(context, SSLContextService, ClientAuth, consume_delimiter, 
std::move(delimiter_str));
 }
 
 void ListenTCP::transferAsFlowFile(const utils::net::Message& message, 
core::ProcessSession& session) {
diff --git a/extensions/standard-processors/processors/ListenTCP.h 
b/extensions/standard-processors/processors/ListenTCP.h
index dc5e5f9ab..af26f81a7 100644
--- a/extensions/standard-processors/processors/ListenTCP.h
+++ b/extensions/standard-processors/processors/ListenTCP.h
@@ -36,10 +36,10 @@ namespace org::apache::nifi::minifi::processors {
 class ListenTCP : public NetworkListenerProcessor {
  public:
   explicit ListenTCP(std::string_view name, const utils::Identifier& uuid = {})
-    : NetworkListenerProcessor(name, uuid, 
core::logging::LoggerFactory<ListenTCP>::getLogger(uuid)) {
+      : NetworkListenerProcessor(name, uuid, 
core::logging::LoggerFactory<ListenTCP>::getLogger(uuid)) {
   }
 
-  EXTENSIONAPI static constexpr const char* Description = "Listens for 
incoming TCP connections and reads data from each connection using a line 
separator as the message demarcator. "
+  EXTENSIONAPI static constexpr const char* Description = "Listens for 
incoming TCP connections and reads data from each connection using a 
configurable message delimiter. "
                                                           "For each message 
the processor produces a single FlowFile.";
 
   EXTENSIONAPI static constexpr auto Port = 
core::PropertyDefinitionBuilder<>::createProperty("Listening Port")
@@ -69,15 +69,29 @@ class ListenTCP : public NetworkListenerProcessor {
       
.withDefaultValue(magic_enum::enum_name(utils::net::ClientAuthOption::NONE))
       
.withAllowedValues(magic_enum::enum_names<utils::net::ClientAuthOption>())
       .build();
-  EXTENSIONAPI static constexpr auto Properties = 
std::array<core::PropertyReference, 5>{
+  EXTENSIONAPI static constexpr auto MessageDelimiter = 
core::PropertyDefinitionBuilder<>::createProperty("Message Delimiter")
+      .withDescription("The delimiter is used to divide the stream into 
flowfiles.")
+      .isRequired(true)
+      .withDefaultValue("\n")
+      .supportsExpressionLanguage(false)
+      .build();
+  EXTENSIONAPI static constexpr auto ConsumeDelimiter = 
core::PropertyDefinitionBuilder<>::createProperty("Consume Delimiter")
+      .withDescription("If set to true then the delimiter won't be included at 
the end of the resulting flowfiles.")
+      .withDefaultValue("true")
+      .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
+      .isRequired(true)
+      .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
std::array<core::PropertyReference, 7>{
       Port,
       MaxBatchSize,
       MaxQueueSize,
       SSLContextService,
-      ClientAuth
+      ClientAuth,
+      MessageDelimiter,
+      ConsumeDelimiter
   };
 
-
   EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "Messages received successfully will be 
sent out this relationship."};
   EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
 
diff --git 
a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp 
b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
index d04a88f69..a7154dd0b 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.cpp
@@ -61,7 +61,11 @@ void NetworkListenerProcessor::startServer(const 
ServerOptions& options, utils::
                      max_batch_size_);
 }
 
-void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& 
context, const core::PropertyReference& ssl_context_property, const 
core::PropertyReference& client_auth_property) {
+void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& 
context,
+    const core::PropertyReference& ssl_context_property,
+    const core::PropertyReference& client_auth_property,
+    bool consume_delimiter,
+    std::string delimiter) {
   gsl_Expects(!server_thread_.joinable() && !server_);
   auto options = readServerOptions(context);
 
@@ -75,7 +79,7 @@ void NetworkListenerProcessor::startTcpServer(const 
core::ProcessContext& contex
     auto client_auth = 
utils::parseEnumProperty<utils::net::ClientAuthOption>(context, 
client_auth_property);
     ssl_options.emplace(std::move(*ssl_data), client_auth);
   }
-  server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, 
options.port, logger_, ssl_options);
+  server_ = std::make_unique<utils::net::TcpServer>(options.max_queue_size, 
options.port, logger_, ssl_options, consume_delimiter, std::move(delimiter));
 
   startServer(options, utils::net::IpProtocol::TCP);
 }
diff --git 
a/extensions/standard-processors/processors/NetworkListenerProcessor.h 
b/extensions/standard-processors/processors/NetworkListenerProcessor.h
index 1dad893cc..4f15637fa 100644
--- a/extensions/standard-processors/processors/NetworkListenerProcessor.h
+++ b/extensions/standard-processors/processors/NetworkListenerProcessor.h
@@ -57,9 +57,16 @@ class NetworkListenerProcessor : public core::Processor {
   }
 
  protected:
-  void startTcpServer(const core::ProcessContext& context, const 
core::PropertyReference& ssl_context_property, const core::PropertyReference& 
client_auth_property);
+  void startTcpServer(const core::ProcessContext& context,
+      const core::PropertyReference& ssl_context_property,
+      const core::PropertyReference& client_auth_property,
+      bool consume_delimiter,
+      std::string delimiter);
   void startUdpServer(const core::ProcessContext& context);
 
+ protected:
+  std::shared_ptr<core::logging::Logger> logger_;
+
  private:
   struct ServerOptions {
     std::optional<uint64_t> max_queue_size;
@@ -78,7 +85,6 @@ class NetworkListenerProcessor : public core::Processor {
   uint64_t max_batch_size_{500};
   std::unique_ptr<utils::net::Server> server_;
   std::thread server_thread_;
-  std::shared_ptr<core::logging::Logger> logger_;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp 
b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
index b1539030d..565df98d8 100644
--- a/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenSyslogTests.cpp
@@ -293,8 +293,8 @@ TEST_CASE("ListenSyslog without parsing test", 
"[ListenSyslog]") {
     SECTION("sending through IPv4", "[IPv4]") {
       endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), 
port);
     }
-    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), 
MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint, 
"\n"), MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint, "\n"), 
MatchesSuccess());
   }
   std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>> result;
   REQUIRE(controller.triggerUntil({{ListenSyslog::Success, 2}}, result, 300ms, 
50ms));
@@ -364,15 +364,15 @@ TEST_CASE("ListenSyslog with parsing test", 
"[ListenSyslog][NetworkListenerProce
     CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_,
                                           rfc5424_doc_example_2.unparsed_,
                                           rfc5424_doc_example_3.unparsed_,
-                                          rfc5424_doc_example_4.unparsed_}, 
endpoint), MatchesSuccess());
+                                          rfc5424_doc_example_4.unparsed_}, 
endpoint, "\n"), MatchesSuccess());
 
     CHECK_THAT(utils::sendMessagesViaTCP({rfc3164_doc_example_1.unparsed_,
                                           rfc3164_doc_example_2.unparsed_,
                                           rfc3164_doc_example_3.unparsed_,
-                                          rfc3164_doc_example_4.unparsed_}, 
endpoint), MatchesSuccess());
+                                          rfc3164_doc_example_4.unparsed_}, 
endpoint, "\n"), MatchesSuccess());
 
-    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, 
endpoint), MatchesSuccess());
-    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint), 
MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_logger_example_1}, endpoint, 
"\n"), MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({invalid_syslog}, endpoint, "\n"), 
MatchesSuccess());
   }
 
   std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>> result;
@@ -480,7 +480,7 @@ TEST_CASE("ListenSyslog max queue and max batch size test", 
"[ListenSyslog][Netw
     }
 
     for (auto i = 0; i < 100; ++i) {
-      CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, 
endpoint), MatchesSuccess());
+      CHECK_THAT(utils::sendMessagesViaTCP({rfc5424_doc_example_1.unparsed_}, 
endpoint, "\n"), MatchesSuccess());
     }
     CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message 
ignored.", 50, 300ms, 50ms));
   }
diff --git a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp 
b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
index b1afd5f13..e65b3fd62 100644
--- a/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListenTcpTests.cpp
@@ -23,11 +23,13 @@
 #include "controllers/SSLContextService.h"
 #include "range/v3/algorithm/contains.hpp"
 #include "utils/IntegrationTestUtils.h"
+#include "utils/StringUtils.h"
+#include "catch2/generators/catch_generators.hpp"
 
 using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
 
 using namespace std::literals::chrono_literals;
-using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+using org::apache::nifi::minifi::utils::verifyLogLineVariantPresenceInPollTime;
 
 namespace org::apache::nifi::minifi::test {
 
@@ -54,8 +56,8 @@ TEST_CASE("ListenTCP test multiple messages", 
"[ListenTCP][NetworkListenerProces
     endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
   }
 
-  CHECK_THAT(utils::sendMessagesViaTCP({"test_message_1"}, endpoint), 
MatchesSuccess());
-  CHECK_THAT(utils::sendMessagesViaTCP({"another_message"}, endpoint), 
MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaTCP({"test_message_1\n"}, endpoint), 
MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaTCP({"another_message\n"}, endpoint), 
MatchesSuccess());
   ProcessorTriggerResult result;
   REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 
50ms));
   CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == 
"test_message_1");
@@ -98,7 +100,7 @@ TEST_CASE("ListenTCP max queue and max batch size test", 
"[ListenTCP][NetworkLis
   LogTestController::getInstance().setWarn<ListenTCP>();
 
   for (auto i = 0; i < 100; ++i) {
-    CHECK_THAT(utils::sendMessagesViaTCP({"test_message"}, endpoint), 
MatchesSuccess());
+    CHECK_THAT(utils::sendMessagesViaTCP({"test_message\n"}, endpoint), 
MatchesSuccess());
   }
 
   CHECK(utils::countLogOccurrencesUntil("Queue is full. TCP message ignored.", 
50, 300ms, 50ms));
@@ -213,8 +215,8 @@ TEST_CASE("Test ListenTCP with SSL connection", 
"[ListenTCP][NetworkListenerProc
       endpoint = asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), 
port);
     }
 
-    utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / 
"resources" / "ca_A.crt");
-    CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "peer did 
not return a certificate (SSL routines)"));
+    [[maybe_unused]] auto send_error = 
utils::sendMessagesViaSSL({"test_message_1"}, endpoint, executable_dir / 
"resources" / "ca_A.crt");
+    CHECK(verifyLogLineVariantPresenceInPollTime(std::chrono::seconds(3), 
"peer did not return a certificate (SSL routines)", "due to unspecified system 
error"));
   }
 
   ProcessorTriggerResult result;
@@ -265,33 +267,12 @@ TEST_CASE("Test ListenTCP SSL/TLS compatibility", 
"[ListenTCP][NetworkListenerPr
   ssl_data.key_pw = "Password12";
 
 
-  asio::ssl::context::method client_method;
-  bool expected_to_work;
-
-  SECTION("sslv2 should be disabled") {
-    client_method = asio::ssl::context::method::sslv2_client;
-    expected_to_work = false;
-  }
-
-  SECTION("sslv3 should be disabled") {
-    client_method = asio::ssl::context::method::sslv3_client;
-    expected_to_work = false;
-  }
-
-  SECTION("tlsv11 should be disabled") {
-    client_method = asio::ssl::context::method::tlsv11_client;
-    expected_to_work = false;
-  }
-
-  SECTION("tlsv12 should be enabled") {
-    client_method = asio::ssl::context::method::tlsv12_client;
-    expected_to_work = true;
-  }
-
-  SECTION("tlsv13 should be enabled") {
-    client_method = asio::ssl::context::method::tlsv13_client;
-    expected_to_work = true;
-  }
+  const auto [client_method, expected_to_work] = GENERATE(
+      std::make_tuple(asio::ssl::context::method::sslv2_client, false),
+      std::make_tuple(asio::ssl::context::method::sslv3_client, false),
+      std::make_tuple(asio::ssl::context::method::tlsv11_client, false),
+      std::make_tuple(asio::ssl::context::method::tlsv12_client, true),
+      std::make_tuple(asio::ssl::context::method::tlsv13_client, true));
 
   if (!isSslMethodAvailable(client_method))
     return;
@@ -308,4 +289,32 @@ TEST_CASE("Test ListenTCP SSL/TLS compatibility", 
"[ListenTCP][NetworkListenerPr
   }
 }
 
+TEST_CASE("Custom delimiter", "[ListenTCP][NetworkListenerProcessor]") {
+  const auto listen_tcp = std::make_shared<ListenTCP>("ListenTCP");
+  SingleProcessorTestController controller{listen_tcp};
+  LogTestController::getInstance().setTrace<ListenTCP>();
+
+  std::string delimiter = GENERATE("\n", "\\n", "foo", "💩", "foo\\nbar");
+  const auto consume_delimiter = GENERATE(true, false);
+
+  REQUIRE(listen_tcp->setProperty(ListenTCP::MessageDelimiter, delimiter));
+  REQUIRE(listen_tcp->setProperty(ListenTCP::ConsumeDelimiter, 
fmt::format("{}", consume_delimiter)));
+  auto port = utils::scheduleProcessorOnRandomPort(controller.plan, 
listen_tcp);
+
+  asio::ip::tcp::endpoint endpoint = 
asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), port);
+
+  auto message1 = "test_message_1";
+  auto message2 = "another_message";
+  delimiter = minifi::utils::string::replaceEscapedCharacters(delimiter);
+  CHECK_THAT(utils::sendMessagesViaTCP({fmt::format("{}{}", message1, 
delimiter)}, endpoint), MatchesSuccess());
+  CHECK_THAT(utils::sendMessagesViaTCP({fmt::format("{}{}", message2, 
delimiter)}, endpoint), MatchesSuccess());
+  ProcessorTriggerResult result;
+  REQUIRE(controller.triggerUntil({{ListenTCP::Success, 2}}, result, 300s, 
50ms));
+  CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[0]) == 
fmt::format("{}{}", message1, consume_delimiter ? "" : delimiter));
+  CHECK(controller.plan->getContent(result.at(ListenTCP::Success)[1]) == 
fmt::format("{}{}", message2, consume_delimiter ? "" : delimiter));
+
+  check_for_attributes(*result.at(ListenTCP::Success)[0], port);
+  check_for_attributes(*result.at(ListenTCP::Success)[1], port);
+}
+
 }  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp 
b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
index cb08ea0c0..f377e0947 100644
--- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp
@@ -32,7 +32,7 @@
 #include "IntegrationTestUtils.h"
 
 using namespace std::literals::chrono_literals;
-using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+using org::apache::nifi::minifi::utils::verifyLogLineVariantPresenceInPollTime;
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -236,7 +236,7 @@ class PutTCPTestFixture {
 
     uint16_t startTCPServer(std::optional<utils::net::SslServerOptions> 
ssl_server_options) {
       gsl_Expects(!cancellable_server && !server_thread_.joinable());
-      cancellable_server = 
std::make_unique<CancellableTcpServer>(std::nullopt, 0, 
core::logging::LoggerFactory<utils::net::Server>::getLogger(), 
std::move(ssl_server_options));
+      cancellable_server = 
std::make_unique<CancellableTcpServer>(std::nullopt, 0, 
core::logging::LoggerFactory<utils::net::Server>::getLogger(), 
std::move(ssl_server_options), true, "\n");
       server_thread_ = std::thread([this]() { cancellable_server->run(); });
       REQUIRE(utils::verifyEventHappenedInPollTime(250ms, [this] { return 
cancellable_server->getPort() != 0; }, 20ms));
       return cancellable_server->getPort();
@@ -410,7 +410,7 @@ TEST_CASE("PutTCP test missing client cert", "[PutTCP]") {
 
   test_fixture.trigger("message for invalid-cert server");
 
-  CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "peer did not 
return a certificate (SSL routines)"));
+  CHECK(verifyLogLineVariantPresenceInPollTime(std::chrono::seconds(3), "peer 
did not return a certificate (SSL routines)", "failed due to asio.ssl error"));
 }
 
 TEST_CASE("PutTCP test idle connection expiration", "[PutTCP]") {
diff --git a/libminifi/include/utils/IntegrationTestUtils.h 
b/libminifi/include/utils/IntegrationTestUtils.h
index 1799a6b87..be9fe4544 100644
--- a/libminifi/include/utils/IntegrationTestUtils.h
+++ b/libminifi/include/utils/IntegrationTestUtils.h
@@ -50,4 +50,13 @@ bool verifyLogLinePresenceInPollTime(const 
std::chrono::duration<Rep, Period>& w
   return verifyEventHappenedInPollTime(wait_duration, check);
 }
 
+template <class Rep, class Period, typename ...String>
+bool verifyLogLineVariantPresenceInPollTime(const std::chrono::duration<Rep, 
Period>& wait_duration, String&&... patterns) {
+  auto check = [&patterns...] {
+    const std::string logs = LogTestController::getInstance().getLogs();
+    return ((logs.find(patterns) != std::string::npos) || ...);
+  };
+  return verifyEventHappenedInPollTime(wait_duration, check);
+}
+
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/StringUtils.h 
b/libminifi/include/utils/StringUtils.h
index a577390ee..2b78153e5 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -430,6 +430,8 @@ bool splitToValueAndUnit(std::string_view input, int64_t& 
value, std::string& un
 struct ParseError {};
 
 nonstd::expected<std::optional<char>, ParseError> 
parseCharacter(std::string_view input);
+
+std::string replaceEscapedCharacters(std::string_view input);
 }  // namespace string
 
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/net/TcpServer.h 
b/libminifi/include/utils/net/TcpServer.h
index 5a2532c76..00f4146d3 100644
--- a/libminifi/include/utils/net/TcpServer.h
+++ b/libminifi/include/utils/net/TcpServer.h
@@ -26,8 +26,15 @@ namespace org::apache::nifi::minifi::utils::net {
 
 class TcpServer : public Server {
  public:
-  TcpServer(std::optional<size_t> max_queue_size_, uint16_t port, 
std::shared_ptr<core::logging::Logger> logger, std::optional<SslServerOptions> 
ssl_data)
+  TcpServer(std::optional<size_t> max_queue_size_,
+      uint16_t port,
+      std::shared_ptr<core::logging::Logger> logger,
+      std::optional<SslServerOptions> ssl_data,
+      bool consume_delimiter,
+      std::string delimiter)
       : Server(max_queue_size_, port, std::move(logger)),
+        consume_delimiter_(consume_delimiter),
+        delimiter_(std::move(delimiter)),
         ssl_data_(std::move(ssl_data)) {
   }
 
@@ -39,6 +46,8 @@ class TcpServer : public Server {
 
   asio::awaitable<void> readLoop(auto& socket, const auto& remote_address, 
const auto& local_port);
 
+  bool consume_delimiter_;
+  const std::string delimiter_;
   std::optional<SslServerOptions> ssl_data_;
 };
 
diff --git a/libminifi/src/utils/StringUtils.cpp 
b/libminifi/src/utils/StringUtils.cpp
index 6c7cd752c..68a32028b 100644
--- a/libminifi/src/utils/StringUtils.cpp
+++ b/libminifi/src/utils/StringUtils.cpp
@@ -518,7 +518,7 @@ bool splitToValueAndUnit(std::string_view input, int64_t& 
value, std::string& un
   return true;
 }
 
-nonstd::expected<std::optional<char>, ParseError> 
parseCharacter(std::string_view input) {
+nonstd::expected<std::optional<char>, ParseError> parseCharacter(const 
std::string_view input) {
   if (input.empty()) { return std::nullopt; }
   if (input.size() == 1) { return input[0]; }
 
@@ -532,12 +532,67 @@ nonstd::expected<std::optional<char>, ParseError> 
parseCharacter(std::string_vie
       case 'v': return '\v';  // Vertical Tab
       case 'f': return '\f';  // Form Feed
       case 'r': return '\r';  // Carriage Return
-      default: return input[1];
+      case '\\': return '\\';
+      default: break;
     }
   }
   return nonstd::make_unexpected(ParseError{});
 }
 
+std::string replaceEscapedCharacters(std::string_view input) {
+  std::stringstream result;
+  for (size_t i = 0; i < input.size(); ++i) {
+    char input_char = input[i];
+    if (input_char != '\\' || i == input.size() - 1) {
+      result << input_char;
+      continue;
+    }
+    char next_char = input[i+1];
+    switch (next_char) {
+      case '0':
+        result << '\0';  // Null
+        ++i;
+        break;
+      case 'a':
+        result << '\a';  // Bell
+        ++i;
+        break;
+      case 'b':
+        result << '\b';  // Backspace
+        ++i;
+        break;
+      case 't':
+        result << '\t';  // Horizontal Tab
+        ++i;
+        break;
+      case 'n':
+        result << '\n';  // Line Feed
+        ++i;
+        break;
+      case 'v':
+        result << '\v';  // Vertical Tab
+        ++i;
+        break;
+      case 'f':
+        result << '\f';  // Form Feed
+        ++i;
+        break;
+      case 'r':
+        result << '\r';  // Carriage Return
+        ++i;
+        break;
+      case '\\':
+        result << '\\';
+        ++i;
+        break;
+      default:
+        result << '\\';
+        break;
+    }
+  }
+  return result.str();
+}
+
 std::string repeat(std::string_view str, size_t count) {
   std::string result;
   result.reserve(count * str.length());
diff --git a/libminifi/src/utils/net/TcpServer.cpp 
b/libminifi/src/utils/net/TcpServer.cpp
index 32a890b75..651c55ed8 100644
--- a/libminifi/src/utils/net/TcpServer.cpp
+++ b/libminifi/src/utils/net/TcpServer.cpp
@@ -50,7 +50,7 @@ asio::awaitable<void> TcpServer::doReceive() {
 asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& 
remote_address, const auto& local_port) {
   std::string read_message;
   while (true) {
-    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, 
asio::dynamic_buffer(read_message), '\n', use_nothrow_awaitable);  // NOLINT
+    auto [read_error, bytes_read] = co_await asio::async_read_until(socket, 
asio::dynamic_buffer(read_message), delimiter_, use_nothrow_awaitable);  // 
NOLINT
     if (read_error) {
       if (read_error != asio::error::eof) {
         logger_->log_error("Error during reading from socket: {}", 
read_error.message());
@@ -64,7 +64,8 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket, const 
auto& remote_addre
     }
 
     if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
-      concurrent_queue_.enqueue(Message(read_message.substr(0, bytes_read - 
1), IpProtocol::TCP, remote_address, local_port));
+      auto message_str = read_message.substr(0, bytes_read - 
(consume_delimiter_ ? delimiter_.size() : 0));
+      concurrent_queue_.enqueue(Message(std::move(message_str), 
IpProtocol::TCP, remote_address, local_port));
     } else {
       logger_->log_warn("Queue is full. TCP message ignored.");
     }
diff --git a/libminifi/test/Utils.h b/libminifi/test/Utils.h
index 4fa777d83..6de35baa7 100644
--- a/libminifi/test/Utils.h
+++ b/libminifi/test/Utils.h
@@ -132,7 +132,7 @@ bool countLogOccurrencesUntil(const std::string& pattern,
   return false;
 }
 
-std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& 
contents, const asio::ip::tcp::endpoint& remote_endpoint) {
+std::error_code sendMessagesViaTCP(const std::vector<std::string_view>& 
contents, const asio::ip::tcp::endpoint& remote_endpoint, const 
std::optional<std::string_view> delimiter = std::nullopt) {
   asio::io_context io_context;
   asio::ip::tcp::socket socket(io_context);
   std::error_code err;
@@ -141,7 +141,8 @@ std::error_code sendMessagesViaTCP(const 
std::vector<std::string_view>& contents
     return err;
   for (auto& content : contents) {
     std::string tcp_message(content);
-    tcp_message += '\n';
+    if (delimiter)
+      tcp_message += *delimiter;
     asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err);
     if (err)
       return err;
diff --git a/libminifi/test/unit/StringUtilsTests.cpp 
b/libminifi/test/unit/StringUtilsTests.cpp
index c48c7460d..b029d8278 100644
--- a/libminifi/test/unit/StringUtilsTests.cpp
+++ b/libminifi/test/unit/StringUtilsTests.cpp
@@ -605,16 +605,29 @@ TEST_CASE("string::parseCharacter tests") {
   CHECK(string::parseCharacter("\\n") == '\n');
   CHECK(string::parseCharacter("\\t") == '\t');
   CHECK(string::parseCharacter("\\r") == '\r');
-  CHECK(string::parseCharacter("\\s") == 's');
-  CHECK(string::parseCharacter("\\'") == '\'');
   CHECK(string::parseCharacter("\\") == '\\');
-  CHECK(string::parseCharacter("\\?") == '\?');
+  CHECK(string::parseCharacter("\\\\") == '\\');
 
+  CHECK_FALSE(string::parseCharacter("\\s").has_value());
+  CHECK_FALSE(string::parseCharacter("\\?").has_value());
   CHECK_FALSE(string::parseCharacter("abc").has_value());
   CHECK_FALSE(string::parseCharacter("\\nd").has_value());
   CHECK(string::parseCharacter("") == std::nullopt);
 }
 
+TEST_CASE("string::replaceEscapedCharacters tests") {
+  CHECK(string::replaceEscapedCharacters("a") == "a");
+  CHECK(string::replaceEscapedCharacters(R"(\n)") == "\n");
+  CHECK(string::replaceEscapedCharacters(R"(\t)") == "\t");
+  CHECK(string::replaceEscapedCharacters(R"(\r)") == "\r");
+  CHECK(string::replaceEscapedCharacters(R"(\s)") == "\\s");
+  CHECK(string::replaceEscapedCharacters(R"(\\ foo \)") == "\\ foo \\");
+  CHECK(string::replaceEscapedCharacters(R"(\\s)") == "\\s");
+  CHECK(string::replaceEscapedCharacters(R"(\r\n)") == "\r\n");
+  CHECK(string::replaceEscapedCharacters(R"(foo\nbar)") == "foo\nbar");
+  CHECK(string::replaceEscapedCharacters(R"(\\n)") == "\\n");
+}
+
 #ifdef WIN32
 TEST_CASE("Conversion from UTF-8 strings to UTF-16 strings works") {
   using org::apache::nifi::minifi::utils::to_wstring;

Reply via email to