adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r921169394


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", 
maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   Because in parent `onSchedule` we connect to the broker and also messages 
can arrive from that point. So the properties before the call (`CleanSession` 
and `QueueBufferMaxMessage`) need to be read before connecting.



##########
docker/test/integration/minifi/core/SingleNodeDockerCluster.py:
##########
@@ -142,6 +142,30 @@ def deploy(self, name):
 
         self.containers[name].deploy()
 
-    def deploy_flow(self):
+    def deploy_flow(self, container_name=None):
+        if container_name is not None:
+            if container_name not in self.containers:
+                logging.error('Could not start container because it is not 
found: \'%s\'', container_name)
+                return
+            self.containers[container_name].deploy()
+            return
         for container in self.containers.values():
             container.deploy()
+
+    def stop_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop container because it is not found: 
\'%s\'', container_name)
+            return
+        self.containers[container_name].stop()
+
+    def kill_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not kill container because it is not found: 
\'%s\'', container_name)
+            return
+        self.containers[container_name].kill()
+
+    def restart_flow(self, container_name):
+        if container_name not in self.containers:
+            logging.error('Could not stop restart because it is not found: 
\'%s\'', container_name)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to