fgerlits commented on a change in pull request #1013:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1013#discussion_r580934162
##########
File path: extensions/standard-processors/tests/unit/ProcessorTests.cpp
##########
@@ -614,3 +614,109 @@ TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") {
TEST_CASE("TestRPGValid", "[TestRPG6]") {
testRPGBypass("", "8080", "8080", false);
}
+
+class TestProcessorProcessIncomingQueue : public minifi::core::Processor {
+ public:
+ explicit TestProcessorProcessIncomingQueue(std::string name,
utils::Identifier uuid = {}) : Processor(name, uuid) {
+ setSupportedProperties({NumFlowFiles});
+ }
+
+ void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>&) override {
+ context->getProperty(NumFlowFiles.getName(), num_flow_files_);
+ }
+
+ void onTrigger(const std::shared_ptr<core::ProcessContext>&, const
std::shared_ptr<core::ProcessSession>& session) override {
+ for (uint32_t i = 0; i < num_flow_files_; ++i) {
+ auto flow_file = session->get();
+ if (flow_file) {
+ session->remove(flow_file);
+ }
+ }
+ }
+
+ static core::Property NumFlowFiles;
+ uint32_t num_flow_files_ = 0;
+};
+
+core::Property TestProcessorProcessIncomingQueue::NumFlowFiles =
core::PropertyBuilder::createProperty("Number of Flow Files to Process")
+ ->withDefaultValue<uint32_t>(0)
+ ->build();
+
+REGISTER_RESOURCE(TestProcessorProcessIncomingQueue, "A mock processor that
processes a configurable number of incoming flow files");
+
+bool testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(uint32_t
num_incoming_, uint32_t num_processed_) {
+ LogTestController::getInstance().setDebug<core::Processor>();
+ LogTestController::getInstance().setDebug<processors::GenerateFlowFile>();
+
+ const auto repo = std::make_shared<TestRepository>();
+ const auto content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(std::make_shared<minifi::Configure>());
+
+ const std::shared_ptr<core::Processor> generate_flow_file =
std::make_shared<processors::GenerateFlowFile>("generate_flow_file");
+ const auto generate_flow_file_node =
std::make_shared<core::ProcessorNode>(generate_flow_file);
+ generate_flow_file->initialize();
+ const auto gff_context =
std::make_shared<core::ProcessContext>(generate_flow_file_node, nullptr, repo,
repo, content_repo);
+ gff_context->setProperty(processors::GenerateFlowFile::BatchSize,
std::to_string(num_incoming_));
+
+ const std::shared_ptr<core::Processor> process_incoming_queue =
std::make_shared<TestProcessorProcessIncomingQueue>("process_incoming_queue");
+ const auto process_incoming_queue_node =
std::make_shared<core::ProcessorNode>(process_incoming_queue);
+ process_incoming_queue->initialize();
+ const auto piq_context =
std::make_shared<core::ProcessContext>(process_incoming_queue_node, nullptr,
repo, repo, content_repo);
+ piq_context->setProperty(TestProcessorProcessIncomingQueue::NumFlowFiles,
std::to_string(num_processed_));
+
+ const auto connection = std::make_shared<minifi::Connection>(repo,
content_repo, "ggf_to_piq");
+ connection->addRelationship(core::Relationship{"success", ""});
+ connection->setSourceUUID(generate_flow_file->getUUID());
+ connection->setDestinationUUID(process_incoming_queue->getUUID());
+ generate_flow_file->addConnection(connection);
+ process_incoming_queue->addConnection(connection);
+
+ const auto gff_session_factory =
std::make_shared<core::ProcessSessionFactory>(gff_context);
+ generate_flow_file->setScheduledState(core::ScheduledState::RUNNING);
+ generate_flow_file->onSchedule(gff_context, gff_session_factory);
+ generate_flow_file->onTrigger(gff_context, gff_session_factory);
+
+ const auto piq_session_factory =
std::make_shared<core::ProcessSessionFactory>(piq_context);
+ process_incoming_queue->setScheduledState(core::ScheduledState::RUNNING);
+ process_incoming_queue->onSchedule(piq_context, piq_session_factory);
+ process_incoming_queue->onTrigger(piq_context, piq_session_factory);
+
+ return process_incoming_queue->isYield();
+}
+
+TEST_CASE("If there are no incoming flow files, then there is no automatic
yield", "[AutomaticYield]") {
+ SECTION("0 flow files in the queue, we don't try to process any") {
+ auto is_yield = testAutomaticYieldWhenNoIncomingFlowFilesAreProcessed(0,
0);
+ REQUIRE_FALSE(is_yield);
+ }
+ SECTION("0 flow files in the queue, we try to process one") {
Review comment:
We call `session->get()` once, which is what I meant by "try to process
one".
Which function do you mean? Probably best to discuss offline.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]