szaszm commented on a change in pull request #776:
URL: https://github.com/apache/nifi-minifi-cpp/pull/776#discussion_r428322626



##########
File path: libminifi/src/c2/C2Agent.cpp
##########
@@ -75,54 +78,55 @@ C2Agent::C2Agent(const 
std::shared_ptr<core::controller::ControllerServiceProvid
 
   c2_producer_ = [&]() {
     // place priority on messages to send to the c2 server
-      if (protocol_.load() != nullptr && 
request_mutex.try_lock_for(std::chrono::seconds(1))) {
-        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
-        if (!requests.empty()) {
-          int count = 0;
-          do {
-            const C2Payload payload(std::move(requests.back()));
-            requests.pop_back();
-            try {
-              C2Payload && response = 
protocol_.load()->consumePayload(payload);
-              enqueue_c2_server_response(std::move(response));
-            }
-            catch(const std::exception &e) {
-              logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
-            }
-            catch(...) {
-              logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
-            }
-          }while(!requests.empty() && ++count < max_c2_responses);
+    if (protocol_.load() != nullptr) {
+      std::vector<C2Payload> payload_batch;
+      payload_batch.reserve(max_c2_responses);
+      auto getRequestPayload = [&payload_batch] (C2Payload&& payload) { 
payload_batch.emplace_back(std::move(payload)); };
+      const std::chrono::system_clock::time_point timeout_point = 
std::chrono::system_clock::now() + std::chrono::milliseconds(1);
+      for (std::size_t attempt_num = 0; attempt_num < max_c2_responses; 
++attempt_num) {
+        if (!requests.consumeWaitUntil(getRequestPayload, timeout_point)) {
+          break;
         }
       }
-      try {
-        performHeartBeat();
-      }
-      catch(const std::exception &e) {
-        logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
-      }
-      catch(...) {
-        logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
-      }
+      std::for_each(
+        std::make_move_iterator(payload_batch.begin()),
+        std::make_move_iterator(payload_batch.end()),
+        [&] (C2Payload&& payload) {
+          try {
+            C2Payload && response = 
protocol_.load()->consumePayload(std::move(payload));
+            enqueue_c2_server_response(std::move(response));
+          }
+          catch(const std::exception &e) {
+            logger_->log_error("Exception occurred while consuming payload. 
error: %s", e.what());
+          }
+          catch(...) {
+            logger_->log_error("Unknonwn exception occurred while consuming 
payload.");
+          }
+        });
 
-      checkTriggers();
+        try {
+          performHeartBeat();
+        }
+        catch (const std::exception &e) {
+          logger_->log_error("Exception occurred while performing heartbeat. 
error: %s", e.what());
+        }
+        catch (...) {
+          logger_->log_error("Unknonwn exception occurred while performing 
heartbeat.");
+        }
+    }
+
+    checkTriggers();
+
+    return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
+  };
 
-      return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
-    };
   functions_.push_back(c2_producer_);
 
-  c2_consumer_ = [&]() {
-    if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
-      C2Payload payload(Operation::HEARTBEAT);
-      {
-        std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
-        if (responses.empty()) {
-          return 
utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
-        }
-        payload = std::move(responses.back());
-        responses.pop_back();
+  c2_consumer_ = [&] {
+    if (responses.size()) {
+      if (!responses.consumeWaitFor([this](C2Payload&& e) { 
extractPayload(std::move(e)); }, std::chrono::seconds(1))) {

Review comment:
       1. My preference is `!empty()`, because it reads as "not empty", which 
is the intention here. On the other hand I'm ok with using `size() > 0`, as 
that's only one logical step away from the intention, but not plain `size()` 
because that implicit int -> bool conversion is surprising to the reader, like 
every implicit conversion that converts to something that's not logically "the 
same thing". I think the readers mind goes like this:
       1. if the size of responses. wtf, that doesn't make sense
       2. Ah, implicit int -> bool conversion, so "if the size of the responses 
is not zero".
       3. That's "if the responses are not empty", i.e. "if there are responses"
   
       Sorry for the long comment, I hope this makes sense.
   
   2. Sorry for the accusation, you're right about the second point. I must 
have been salty because of something when writing this. :(
   
       However, this raises another problem, which is that I, as the reader, 
misunderstood the code, so it's probably too complex. I suggest extracting the 
lambda and maybe even the consume call, so that the identifiers can guide the 
reader.
   
   3. \-
   
   ```suggestion
       if (responses.size() > 0) {  // or (!responses.empty()), or (not 
responses.empty())
         const auto on_dequeue = [this](C2Payload&& payload) { 
extractPayload(std::move(payload)); };
         const auto dequeue_success = responses.consumeWaitFor(on_dequeue, 
std::chrono::seconds{1});
         if (!dequeue_success) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to