hunyadi-dev commented on a change in pull request #776: URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r428478229
########## File path: libminifi/src/c2/C2Agent.cpp ########## @@ -75,54 +78,55 @@ C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvid c2_producer_ = [&]() { // place priority on messages to send to the c2 server - if (protocol_.load() != nullptr && request_mutex.try_lock_for(std::chrono::seconds(1))) { - std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock); - if (!requests.empty()) { - int count = 0; - do { - const C2Payload payload(std::move(requests.back())); - requests.pop_back(); - try { - C2Payload && response = protocol_.load()->consumePayload(payload); - enqueue_c2_server_response(std::move(response)); - } - catch(const std::exception &e) { - logger_->log_error("Exception occurred while consuming payload. error: %s", e.what()); - } - catch(...) { - logger_->log_error("Unknonwn exception occurred while consuming payload."); - } - }while(!requests.empty() && ++count < max_c2_responses); + if (protocol_.load() != nullptr) { + std::vector<C2Payload> payload_batch; + payload_batch.reserve(max_c2_responses); + auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { payload_batch.emplace_back(std::move(payload)); }; + const std::chrono::system_clock::time_point timeout_point = std::chrono::system_clock::now() + std::chrono::milliseconds(1); + for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; ++attempt_num) { + if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) { + break; } } - try { - performHeartBeat(); - } - catch(const std::exception &e) { - logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what()); - } - catch(...) { - logger_->log_error("Unknonwn exception occurred while performing heartbeat."); - } + std::for_each( + std::make_move_iterator(payload_batch.begin()), + std::make_move_iterator(payload_batch.end()), + [&] (C2Payload&& payload) { + try { + C2Payload && response = protocol_.load()->consumePayload(std::move(payload)); + enqueue_c2_server_response(std::move(response)); + } + catch(const std::exception &e) { + logger_->log_error("Exception occurred while consuming payload. error: %s", e.what()); + } + catch(...) { + logger_->log_error("Unknonwn exception occurred while consuming payload."); + } + }); - checkTriggers(); + try { + performHeartBeat(); + } + catch (const std::exception &e) { + logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what()); + } + catch (...) { + logger_->log_error("Unknonwn exception occurred while performing heartbeat."); + } + } + + checkTriggers(); + + return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_)); + }; - return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_)); - }; functions_.push_back(c2_producer_); - c2_consumer_ = [&]() { - if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) { - C2Payload payload(Operation::HEARTBEAT); - { - std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock); - if (responses.empty()) { - return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS)); - } - payload = std::move(responses.back()); - responses.pop_back(); + c2_consumer_ = [&] { + if (responses.size()) { + if (!responses.consumeWaitFor([this](C2Payload&& e) { extractPayload(std::move(e)); }, std::chrono::seconds(1))) { Review comment: 1. > Call empty instead of checking size() against zero. Of course I would do that, my problem is with the hidden bangs when reading code :) Unfortunately, the current linter setting warns on `! empty()` which would make it at least stand out. Maybe it is just personal because I have committed incorrect code due to not noticing a `!`, but always felt reading `if(vector.size())` as `if vector has elements`. tldr; will change it 2. Will extract the logic out. 3. I dont want to commit that comment on, can I somehow edit the suggestion? :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org