This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit f27e349b508c502218ca702b78fad2ef931f4f83 Author: Martin Zink <martinz...@apache.org> AuthorDate: Tue Feb 28 13:11:23 2023 +0100 MINIFICPP-1887 Add default connection size limits default limits: 2000 flow files / 100MB data size Closes #1501 Signed-off-by: Marton Szasz <sza...@apache.org> --- .../tests/unit/FlowJsonTests.cpp | 4 +- .../tests/unit/ProcessorTests.cpp | 10 +- .../tests/unit/YamlConnectionParserTest.cpp | 4 +- libminifi/include/Connection.h | 110 ++++++++------------- libminifi/include/core/state/ConnectionStore.h | 4 +- .../include/core/state/nodes/FlowInformation.h | 4 +- libminifi/include/core/state/nodes/QueueMetrics.h | 4 +- libminifi/src/Connection.cpp | 19 ++-- libminifi/src/core/ProcessSession.cpp | 2 +- libminifi/src/core/Processor.cpp | 8 +- .../src/core/flow/StructuredConfiguration.cpp | 4 +- .../src/core/flow/StructuredConnectionParser.cpp | 4 +- .../test/persistence-tests/PersistenceTests.cpp | 4 +- libminifi/test/rocksdb-tests/RepoTests.cpp | 2 +- libminifi/test/unit/ConnectionTests.cpp | 41 ++++++++ libminifi/test/unit/MetricsTests.cpp | 4 +- libminifi/test/unit/ResponseNodeLoaderTests.cpp | 2 +- 17 files changed, 119 insertions(+), 111 deletions(-) diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp index b7d1a47f8..495965122 100644 --- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp +++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp @@ -150,8 +150,8 @@ TEST_CASE("NiFi flow json format is correctly parsed") { REQUIRE(connection1->getSource() == proc); REQUIRE(connection1->getDestination() == funnel); REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}})); - REQUIRE(connection1->getMaxQueueSize() == 7); - REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB); + REQUIRE(connection1->getBackpressureThresholdCount() == 7); + REQUIRE(connection1->getBackpressureThresholdDataSize() == 11_KiB); REQUIRE(13s == connection1->getFlowExpirationDuration()); auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008"); diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp index b7afc647c..c3b43856a 100644 --- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp +++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp @@ -245,7 +245,7 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") { std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "GFF2Connection"); - connection->setMaxQueueSize(5); + connection->setBackpressureThresholdCount(5); connection->addRelationship(core::Relationship("success", "description")); @@ -270,8 +270,8 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") { auto session = std::make_shared<core::ProcessSession>(context); - REQUIRE(session->outgoingConnectionsFull("success") == false); - REQUIRE(connection->isFull() == false); + CHECK_FALSE(session->outgoingConnectionsFull("success")); + CHECK_FALSE(connection->backpressureThresholdReached()); processor->incrementActiveTasks(); processor->setScheduledState(core::ScheduledState::RUNNING); @@ -279,8 +279,8 @@ TEST_CASE("TestConnectionFull", "[ConnectionFull]") { session->commit(); - REQUIRE(connection->isFull()); - REQUIRE(session->outgoingConnectionsFull("success")); + CHECK(connection->backpressureThresholdReached()); + CHECK(session->outgoingConnectionsFull("success")); } TEST_CASE("LogAttributeTest", "[getfileCreate3]") { diff --git a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp index 44d346ed4..0444c71e6 100644 --- a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp +++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp @@ -183,8 +183,8 @@ TEST_CASE("Connections components are parsed from yaml", "[YamlConfiguration]") "drop empty: \n"}); flow::Node connection_node{std::make_shared<YamlNode>(yaml_node)}; StructuredConnectionParser yaml_connection_parser(connection_node, "test_node", parent_ptr, logger); - CHECK(0 == yaml_connection_parser.getWorkQueueSize()); - CHECK(0 == yaml_connection_parser.getWorkQueueDataSize()); + CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT == yaml_connection_parser.getWorkQueueSize()); + CHECK(minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE == yaml_connection_parser.getWorkQueueDataSize()); CHECK(0 == yaml_connection_parser.getSwapThreshold()); CHECK(0s == yaml_connection_parser.getFlowFileExpiration()); CHECK(0 == yaml_connection_parser.getDropEmpty()); diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index 81e1a7a25..154958ba6 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -27,6 +27,7 @@ #include <mutex> #include <atomic> #include <algorithm> +#include <utility> #include "core/Core.h" #include "core/Connectable.h" #include "core/logging/Logger.h" @@ -52,86 +53,81 @@ class Connection : public core::Connectable { const utils::Identifier &srcUUID, const utils::Identifier &destUUID); explicit Connection(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<SwapManager> swap_manager, std::string name, const utils::Identifier& uuid); - // Destructor ~Connection() override = default; - // Set Source Processor UUID + Connection(const Connection &parent) = delete; + Connection &operator=(const Connection &parent) = delete; + + static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_COUNT = 2000; + static constexpr uint64_t DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE = 100_MB; + void setSourceUUID(const utils::Identifier &uuid) { src_uuid_ = uuid; } - // Set Destination Processor UUID + void setDestinationUUID(const utils::Identifier &uuid) { dest_uuid_ = uuid; } - // Get Source Processor UUID + utils::Identifier getSourceUUID() const { return src_uuid_; } - // Get Destination Processor UUID + utils::Identifier getDestinationUUID() const { return dest_uuid_; } - // Set Connection Source Processor void setSource(core::Connectable* source) { source_connectable_ = source; } - // ! Get Connection Source Processor + core::Connectable* getSource() const { return source_connectable_; } - // Set Connection Destination Processor + void setDestination(core::Connectable* dest) { dest_connectable_ = dest; } - // ! Get Connection Destination Processor - core::Connectable* getDestination() { - return dest_connectable_; - } - /** - * Deprecated function - * Please use addRelationship. - */ - void setRelationship(core::Relationship relationship) { - relationships_.insert(relationship); + core::Connectable* getDestination() const { + return dest_connectable_; } - // Set Connection relationship void addRelationship(core::Relationship relationship) { - relationships_.insert(relationship); + relationships_.insert(std::move(relationship)); } - // ! Get Connection relationship + const std::set<core::Relationship> &getRelationships() const { return relationships_; } - // Set Max Queue Size - void setMaxQueueSize(uint64_t size) { - max_queue_size_ = size; + + void setBackpressureThresholdCount(uint64_t size) { + backpressure_threshold_count_ = size; } - // Get Max Queue Size - uint64_t getMaxQueueSize() { - return max_queue_size_; + + uint64_t getBackpressureThresholdCount() const { + return backpressure_threshold_count_; } - // Set Max Queue Data Size - void setMaxQueueDataSize(uint64_t size) { - max_data_queue_size_ = size; + + void setBackpressureThresholdDataSize(uint64_t size) { + backpressure_threshold_data_size_ = size; + } + + uint64_t getBackpressureThresholdDataSize() const { + return backpressure_threshold_data_size_; } + void setSwapThreshold(uint64_t size) { queue_.setTargetSize(size); queue_.setMinSize(size / 2); queue_.setMaxSize(size * 3 / 2); } - // Get Max Queue Data Size - uint64_t getMaxQueueDataSize() { - return max_data_queue_size_; - } - // Set Flow expiration duration in millisecond + void setFlowExpirationDuration(std::chrono::milliseconds duration) { expired_duration_ = duration; } - // Get Flow expiration duration in millisecond - std::chrono::milliseconds getFlowExpirationDuration() { + + std::chrono::milliseconds getFlowExpirationDuration() const { return expired_duration_; } @@ -143,28 +139,25 @@ class Connection : public core::Connectable { return drop_empty_; } - // Check whether the queue is empty bool isEmpty() const; - // Check whether the queue is full to apply back pressure - bool isFull() const; - // Get queue size - uint64_t getQueueSize() { + + bool backpressureThresholdReached() const; + + uint64_t getQueueSize() const { std::lock_guard<std::mutex> lock(mutex_); return queue_.size(); } - // Get queue data size + uint64_t getQueueDataSize() { return queued_data_size_; } - // Put the flow file into queue void put(const std::shared_ptr<core::FlowFile>& flow) override; - // Put multiple flowfiles into the queue void multiPut(std::vector<std::shared_ptr<core::FlowFile>>& flows); - // Poll the flow file from queue, the expired flow file record also being returned + std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords); - // Drain the flow records + void drain(bool delete_permanently); void yield() override {} @@ -179,41 +172,22 @@ class Connection : public core::Connectable { } protected: - // Source Processor UUID utils::Identifier src_uuid_; - // Destination Processor UUID utils::Identifier dest_uuid_; - // Relationship for this connection std::set<core::Relationship> relationships_; - // Source Processor (ProcessNode/Port) core::Connectable* source_connectable_ = nullptr; - // Destination Processor (ProcessNode/Port) core::Connectable* dest_connectable_ = nullptr; - // Max queue size to apply back pressure - std::atomic<uint64_t> max_queue_size_ = 0; - // Max queue data size to apply back pressure - std::atomic<uint64_t> max_data_queue_size_ = 0; - // Flow File Expiration Duration in= MilliSeconds + std::atomic<uint64_t> backpressure_threshold_count_ = DEFAULT_BACKPRESSURE_THRESHOLD_COUNT; + std::atomic<uint64_t> backpressure_threshold_data_size_ = DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE; std::atomic<std::chrono::milliseconds> expired_duration_ = std::chrono::milliseconds(0); - // flow file repository std::shared_ptr<core::Repository> flow_repository_; - // content repository reference. std::shared_ptr<core::ContentRepository> content_repo_; private: bool drop_empty_ = false; - // Mutex for protection mutable std::mutex mutex_; - // Queued data size std::atomic<uint64_t> queued_data_size_ = 0; - // Queue for the Flow File utils::FlowFileQueue queue_; - // flow repository - // Logger std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<Connection>::getLogger(); - // Prevent default copy constructor and assignment operation - // Only support pass by reference or pointer - Connection(const Connection &parent); - Connection &operator=(const Connection &parent); }; } // namespace org::apache::nifi::minifi diff --git a/libminifi/include/core/state/ConnectionStore.h b/libminifi/include/core/state/ConnectionStore.h index cd7ed2082..a5cfa4eff 100644 --- a/libminifi/include/core/state/ConnectionStore.h +++ b/libminifi/include/core/state/ConnectionStore.h @@ -40,11 +40,11 @@ class ConnectionStore { for (const auto& [_, connection] : connections_) { metrics.push_back({"queue_data_size", static_cast<double>(connection->getQueueDataSize()), {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}}); - metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getMaxQueueDataSize()), + metrics.push_back({"queue_data_size_max", static_cast<double>(connection->getBackpressureThresholdDataSize()), {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}}); metrics.push_back({"queue_size", static_cast<double>(connection->getQueueSize()), {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}}); - metrics.push_back({"queue_size_max", static_cast<double>(connection->getMaxQueueSize()), + metrics.push_back({"queue_size_max", static_cast<double>(connection->getBackpressureThresholdCount()), {{"connection_uuid", connection->getUUIDStr()}, {"connection_name", connection->getName()}, {"metric_class", metric_class}}}); } diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index e08eb6684..894d938a0 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -198,7 +198,7 @@ class FlowInformation : public FlowMonitor { SerializedResponseNode queuesizemax; queuesizemax.name = "sizeMax"; - queuesizemax.value = queue.second->getMaxQueueSize(); + queuesizemax.value = queue.second->getBackpressureThresholdCount(); SerializedResponseNode datasize; datasize.name = "dataSize"; @@ -206,7 +206,7 @@ class FlowInformation : public FlowMonitor { SerializedResponseNode datasizemax; datasizemax.name = "dataSizeMax"; - datasizemax.value = queue.second->getMaxQueueDataSize(); + datasizemax.value = queue.second->getBackpressureThresholdDataSize(); repoNode.children.push_back(queuesize); repoNode.children.push_back(queuesizemax); diff --git a/libminifi/include/core/state/nodes/QueueMetrics.h b/libminifi/include/core/state/nodes/QueueMetrics.h index f89d34da6..3b6a832d6 100644 --- a/libminifi/include/core/state/nodes/QueueMetrics.h +++ b/libminifi/include/core/state/nodes/QueueMetrics.h @@ -64,7 +64,7 @@ class QueueMetrics : public ResponseNode, public ConnectionStore { SerializedResponseNode datasizemax; datasizemax.name = "datasizemax"; - datasizemax.value = std::to_string(connection->getMaxQueueDataSize()); + datasizemax.value = std::to_string(connection->getBackpressureThresholdDataSize()); SerializedResponseNode queuesize; queuesize.name = "queued"; @@ -72,7 +72,7 @@ class QueueMetrics : public ResponseNode, public ConnectionStore { SerializedResponseNode queuesizemax; queuesizemax.name = "queuedmax"; - queuesizemax.value = std::to_string(connection->getMaxQueueSize()); + queuesizemax.value = std::to_string(connection->getBackpressureThresholdCount()); parent.children.push_back(datasize); parent.children.push_back(datasizemax); diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index cddd0463c..7c3145c39 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -18,20 +18,15 @@ * limitations under the License. */ #include "Connection.h" -#include <ctime> #include <vector> -#include <queue> #include <memory> #include <string> -#include <map> #include <set> #include <chrono> #include <thread> -#include <iostream> #include <list> #include "core/FlowFile.h" -#include "core/Processor.h" -#include "core/logging/LoggerConfiguration.h" +#include "core/Connectable.h" using namespace std::literals::chrono_literals; @@ -85,17 +80,15 @@ bool Connection::isEmpty() const { return queue_.empty(); } -bool Connection::isFull() const { +bool Connection::backpressureThresholdReached() const { std::lock_guard<std::mutex> lock(mutex_); + auto backpressure_threshold_count = backpressure_threshold_count_.load(); + auto backpressure_threshold_data_size = backpressure_threshold_data_size_.load(); - if (max_queue_size_ <= 0 && max_data_queue_size_ <= 0) - // No back pressure setting - return false; - - if (max_queue_size_ > 0 && queue_.size() >= max_queue_size_) + if (backpressure_threshold_count != 0 && queue_.size() >= backpressure_threshold_count) return true; - if (max_data_queue_size_ > 0 && queued_data_size_ >= max_data_queue_size_) + if (backpressure_threshold_data_size != 0 && queued_data_size_ >= backpressure_threshold_data_size) return true; return false; diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 1d06b8da7..8bf30dc1c 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -1130,7 +1130,7 @@ bool ProcessSession::outgoingConnectionsFull(const std::string& relationship) { Connection * connection = nullptr; for (const auto conn : connections) { connection = dynamic_cast<Connection*>(conn); - if (connection && connection->isFull()) { + if (connection && connection->backpressureThresholdReached()) { return true; } } diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index ffc186a96..fdb15f4a3 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -172,7 +172,7 @@ bool Processor::flowFilesOutGoingFull() const { std::set<Connectable*> existedConnection = connection_pair.second; const bool has_full_connection = std::any_of(begin(existedConnection), end(existedConnection), [](const Connectable* conn) { auto connection = dynamic_cast<const Connection*>(conn); - return connection && connection->isFull(); + return connection && connection->backpressureThresholdReached(); }); if (has_full_connection) { return true; } } @@ -312,12 +312,12 @@ bool Processor::isThrottledByBackpressure() const { bool isThrottledByOutgoing = ranges::any_of(outgoing_connections_, [](auto& name_connection_set_pair) { return ranges::any_of(name_connection_set_pair.second, [](auto& connectable) { auto connection = dynamic_cast<Connection*>(connectable); - return connection && connection->isFull(); + return connection && connection->backpressureThresholdReached(); }); }); bool isForcedByIncomingCycle = ranges::any_of(incoming_connections_, [](auto& connectable) { auto connection = dynamic_cast<Connection*>(connectable); - return connection && partOfCycle(connection) && connection->isFull(); + return connection && partOfCycle(connection) && connection->backpressureThresholdReached(); }); return isThrottledByOutgoing && !isForcedByIncomingCycle; } @@ -333,7 +333,7 @@ Connectable* Processor::pickIncomingConnection() { if (!connection) { continue; } - if (partOfCycle(connection) && connection->isFull()) { + if (partOfCycle(connection) && connection->backpressureThresholdReached()) { return inConn; } } while (incoming_connections_Iter != beginIt); diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 02624a4dc..847258399 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -554,8 +554,8 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c logger_->log_debug("Created connection with UUID %s and name %s", id, name); const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_); connectionParser.configureConnectionSourceRelationships(*connection); - connection->setMaxQueueSize(connectionParser.getWorkQueueSize()); - connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize()); + connection->setBackpressureThresholdCount(connectionParser.getWorkQueueSize()); + connection->setBackpressureThresholdDataSize(connectionParser.getWorkQueueDataSize()); connection->setSwapThreshold(connectionParser.getSwapThreshold()); connection->setSourceUUID(connectionParser.getSourceUUID()); connection->setDestinationUUID(connectionParser.getDestinationUUID()); diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp index a6521884b..9ac23ac89 100644 --- a/libminifi/src/core/flow/StructuredConnectionParser.cpp +++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp @@ -76,7 +76,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const { } logger_->log_error("Invalid max queue size value: %s.", max_work_queue_str); } - return 0; + return Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT; } uint64_t StructuredConnectionParser::getWorkQueueDataSize() const { @@ -90,7 +90,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const { } logger_->log_error("Invalid max queue data size value: %s.", max_work_queue_str); } - return 0; + return Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE; } uint64_t StructuredConnectionParser::getSwapThreshold() const { diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp index e417ae417..748775fcd 100644 --- a/libminifi/test/persistence-tests/PersistenceTests.cpp +++ b/libminifi/test/persistence-tests/PersistenceTests.cpp @@ -77,7 +77,7 @@ struct TestFlow{ auto input = std::make_unique<Connection>(ff_repository, content_repo, "Input", inputConnUUID()); { input_ = input.get(); - input->setRelationship({"input", "d"}); + input->addRelationship({"input", "d"}); input->setDestinationUUID(mainProcUUID()); input->setSourceUUID(inputProcUUID()); inputProcessor->addConnection(input.get()); @@ -87,7 +87,7 @@ struct TestFlow{ auto output = std::make_unique<Connection>(ff_repository, content_repo, "Output", outputConnUUID()); { output_ = output.get(); - output->setRelationship(relationshipToOutput); + output->addRelationship(relationshipToOutput); output->setSourceUUID(mainProcUUID()); } diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp index ef4d073bf..7a646b545 100644 --- a/libminifi/test/rocksdb-tests/RepoTests.cpp +++ b/libminifi/test/rocksdb-tests/RepoTests.cpp @@ -274,7 +274,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") { core::Relationship inputRel{"Input", "dummy"}; auto input = std::make_unique<minifi::Connection>(ff_repository, content_repo, "Input"); - input->setRelationship(inputRel); + input->addRelationship(inputRel); auto root = std::make_unique<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root"); auto inputPtr = input.get(); diff --git a/libminifi/test/unit/ConnectionTests.cpp b/libminifi/test/unit/ConnectionTests.cpp index f018ce1f6..9a82a893f 100644 --- a/libminifi/test/unit/ConnectionTests.cpp +++ b/libminifi/test/unit/ConnectionTests.cpp @@ -84,3 +84,44 @@ TEST_CASE("Connection::poll() works correctly", "[poll]") { REQUIRE(nullptr == connection->poll(expired_flow_files)); } } + +TEST_CASE("Connection backpressure tests", "[Connection]") { + const auto flow_repo = std::make_shared<TestRepository>(); + const auto content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(std::make_shared<minifi::Configure>()); + + const auto id_generator = utils::IdGenerator::getIdGenerator(); + const auto connection = std::make_shared<minifi::Connection>(flow_repo, content_repo, "test_connection", id_generator->generate(), id_generator->generate(), id_generator->generate()); + + CHECK(connection->getBackpressureThresholdDataSize() == minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_DATA_SIZE); + CHECK(connection->getBackpressureThresholdCount() == minifi::Connection::DEFAULT_BACKPRESSURE_THRESHOLD_COUNT); + + SECTION("The number of flowfiles can be limited") { + connection->setBackpressureThresholdCount(2); + CHECK_FALSE(connection->backpressureThresholdReached()); + connection->put(std::make_shared<core::FlowFile>()); + CHECK_FALSE(connection->backpressureThresholdReached()); + connection->put(std::make_shared<core::FlowFile>()); + CHECK(connection->backpressureThresholdReached()); + connection->setBackpressureThresholdCount(0); + CHECK_FALSE(connection->backpressureThresholdReached()); + } + SECTION("The size of the data can be limited") { + connection->setBackpressureThresholdDataSize(3_KB); + CHECK_FALSE(connection->backpressureThresholdReached()); + { + auto flow_file = std::make_shared<core::FlowFile>(); + flow_file->setSize(2_KB); + connection->put(flow_file); + } + CHECK_FALSE(connection->backpressureThresholdReached()); + { + auto flow_file = std::make_shared<core::FlowFile>(); + flow_file->setSize(2_KB); + connection->put(flow_file); + } + CHECK(connection->backpressureThresholdReached()); + connection->setBackpressureThresholdDataSize(0); + CHECK_FALSE(connection->backpressureThresholdReached()); + } +} diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index a73cecae5..972365fd4 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -53,8 +53,8 @@ TEST_CASE("QueueMetricsTestConnections", "[c2m3]") { auto connection = std::make_unique<minifi::Connection>(repo, content_repo, "testconnection"); - connection->setMaxQueueDataSize(1024); - connection->setMaxQueueSize(1024); + connection->setBackpressureThresholdDataSize(1024); + connection->setBackpressureThresholdCount(1024); metrics.updateConnection(connection.get()); diff --git a/libminifi/test/unit/ResponseNodeLoaderTests.cpp b/libminifi/test/unit/ResponseNodeLoaderTests.cpp index 222346764..52f9238f1 100644 --- a/libminifi/test/unit/ResponseNodeLoaderTests.cpp +++ b/libminifi/test/unit/ResponseNodeLoaderTests.cpp @@ -58,7 +58,7 @@ class ResponseNodeLoaderTestFixture { void addConnection(const std::string& connection_name, const std::string& relationship_name, const minifi::utils::Identifier& src_uuid, const minifi::utils::Identifier& dst_uuid) { auto connection = std::make_unique<minifi::Connection>(ff_repository_, content_repo_, connection_name); - connection->setRelationship({relationship_name, "d"}); + connection->addRelationship({relationship_name, "d"}); connection->setDestinationUUID(src_uuid); connection->setSourceUUID(dst_uuid); root_->addConnection(std::move(connection));