adam-markovics commented on code in PR #1363: URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r921169394
########## extensions/mqtt/processors/ConsumeMQTT.cpp: ########## @@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) { +void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message) { if (queue_.size_approx() >= maxQueueSize_) { logger_->log_warn("MQTT queue full"); - return false; - } else { - if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_) - message->payloadlen = maxSegSize_; - queue_.enqueue(message); - logger_->log_debug("enqueue MQTT message length %d", message->payloadlen); - return true; + return; + } + + if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) { + logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); + message->payloadlen = gsl::narrow<int>(max_seg_size_); } + + logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + queue_.enqueue(std::move(message)); } void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) { - AbstractMQTTProcessor::onSchedule(context, factory); std::string value; int64_t valInt; value = ""; + if (context->getProperty(CleanSession.getName(), value)) { + cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_); + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); + } + value = ""; if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { maxQueueSize_ = valInt; logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); } - value = ""; - if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - maxSegSize_ = valInt; - logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", maxSegSize_); + + AbstractMQTTProcessor::onSchedule(context, factory); Review Comment: Because in parent `onSchedule` we connect to the broker and also messages can arrive from that point. So the properties before the call (`CleanSession` and `QueueBufferMaxMessage`) need to be read before connecting. ########## docker/test/integration/minifi/core/SingleNodeDockerCluster.py: ########## @@ -142,6 +142,30 @@ def deploy(self, name): self.containers[name].deploy() - def deploy_flow(self): + def deploy_flow(self, container_name=None): + if container_name is not None: + if container_name not in self.containers: + logging.error('Could not start container because it is not found: \'%s\'', container_name) + return + self.containers[container_name].deploy() + return for container in self.containers.values(): container.deploy() + + def stop_flow(self, container_name): + if container_name not in self.containers: + logging.error('Could not stop container because it is not found: \'%s\'', container_name) + return + self.containers[container_name].stop() + + def kill_flow(self, container_name): + if container_name not in self.containers: + logging.error('Could not kill container because it is not found: \'%s\'', container_name) + return + self.containers[container_name].kill() + + def restart_flow(self, container_name): + if container_name not in self.containers: + logging.error('Could not stop restart because it is not found: \'%s\'', container_name) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org