martinzink commented on code in PR #1294: URL: https://github.com/apache/nifi-minifi-cpp/pull/1294#discussion_r849460551
########## extensions/standard-processors/processors/ListenSyslog.cpp: ########## @@ -17,318 +14,272 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + + #include "ListenSyslog.h" -#include <stdio.h> -#include <memory> -#include <string> -#include <vector> -#include <set> -#include <queue> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" -#include "core/TypedValues.h" #include "core/Resource.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { -#ifndef WIN32 -core::Property ListenSyslog::RecvBufSize( - core::PropertyBuilder::createProperty("Receive Buffer Size")->withDescription("The size of each buffer used to receive Syslog messages.")-> - withDefaultValue<core::DataSizeValue>("65507 B")->build()); - -core::Property ListenSyslog::MaxSocketBufSize( - core::PropertyBuilder::createProperty("Max Size of Socket Buffer")->withDescription("The maximum size of the socket buffer that should be used.")->withDefaultValue<core::DataSizeValue>("1 MB") - ->build()); +namespace org::apache::nifi::minifi::processors { -core::Property ListenSyslog::MaxConnections( - core::PropertyBuilder::createProperty("Max Number of TCP Connections")->withDescription("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") - ->withDefaultValue<int>(2)->build()); +const core::Property ListenSyslog::Port( + core::PropertyBuilder::createProperty("Listening Port") + ->withDescription("The port for Syslog communication.") + ->isRequired(true) + ->withDefaultValue<int>(514, core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build()); -core::Property ListenSyslog::MaxBatchSize( - core::PropertyBuilder::createProperty("Max Batch Size")->withDescription("The maximum number of Syslog events to add to a single FlowFile.")->withDefaultValue<int>(1)->build()); +const core::Property ListenSyslog::ProtocolProperty( + core::PropertyBuilder::createProperty("Protocol") + ->withDescription("The protocol for Syslog communication.") + ->isRequired(true) + ->withAllowableValues(Protocol::values()) + ->withDefaultValue(toString(Protocol::UDP)) + ->build()); -core::Property ListenSyslog::MessageDelimiter( - core::PropertyBuilder::createProperty("Message Delimiter")->withDescription("Specifies the delimiter to place between Syslog messages when multiple " - "messages are bundled together (see <Max Batch Size> core::Property).")->withDefaultValue("\n")->build()); +const core::Property ListenSyslog::MaxBatchSize( + core::PropertyBuilder::createProperty("Max Batch Size") + ->withDescription("The maximum number of Syslog events to process at a time.") + ->withDefaultValue<uint64_t>(500, std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 validator", 1)) + ->build()); -core::Property ListenSyslog::ParseMessages( - core::PropertyBuilder::createProperty("Parse Messages")->withDescription("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.") +const core::Property ListenSyslog::ParseMessages( + core::PropertyBuilder::createProperty("Parse Messages") + ->withDescription("Indicates if the processor should parse the Syslog messages. " + "If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.") ->withDefaultValue<bool>(false)->build()); -core::Property ListenSyslog::Protocol( - core::PropertyBuilder::createProperty("Protocol")->withDescription("The protocol for Syslog communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue( - "UDP")->build()); +const core::Property ListenSyslog::MaxQueueSize( + core::PropertyBuilder::createProperty("Max Size of Message Queue") + ->withDescription("Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. " + "If the buffer full, the message is ignored. If set to zero the buffer is unlimited.") + ->withDefaultValue<uint64_t>(0)->build()); + +const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. " + "When Parse Messages is set to false, all incoming message will be sent to this relationship."); +const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages that do not match the expected format when parsing will be sent to this relationship."); + -core::Property ListenSyslog::Port( - core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::get().PORT_VALIDATOR)->build()); +const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_( + R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)" // priority + R"((?:(\d{1,2}))\s)" // version + R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)" // timestamp + R"((?:([\S]{1,255}))\s)" // hostname + R"((?:([\S]{1,48}))\s)" // app_name + R"((?:([\S]{1,128}))\s)" // proc_id + R"((?:([\S]{1,32}))\s)" // msg_id + R"((?:(-|(?:\[.+?\])+))\s?)" // structured_data + R"((?:((?:.+)))?$)", std::regex::ECMAScript); // msg -core::Relationship ListenSyslog::Success("success", "All files are routed to success"); -core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); +const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_( + R"((?:\<(\d{1,3})\>))" // priority + R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)" // timestamp + R"(([\w][\w\d(\.|\:)@-]*)\s)" // hostname + R"((.*)$)", std::regex::ECMAScript); // msg void ListenSyslog::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(RecvBufSize); - properties.insert(MaxSocketBufSize); - properties.insert(MaxConnections); - properties.insert(MaxBatchSize); - properties.insert(MessageDelimiter); - properties.insert(ParseMessages); - properties.insert(Protocol); - properties.insert(Port); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - relationships.insert(Invalid); - setSupportedRelationships(relationships); + setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, MaxQueueSize}); + setSupportedRelationships({Success, Invalid}); } -void ListenSyslog::startSocketThread() { - if (_thread != NULL) - return; +void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context && !server_thread_.joinable() && !server_); - logger_->log_trace("ListenSysLog Socket Thread Start"); - _serverTheadRunning = true; - _thread = new std::thread(run, this); - _thread->detach(); -} + context->getProperty(MaxBatchSize.getName(), max_batch_size_); + context->getProperty(ParseMessages.getName(), parse_messages_); -void ListenSyslog::run(ListenSyslog *process) { - process->runThread(); -} + uint64_t max_queue_size = 0; + context->getProperty(MaxQueueSize.getName(), max_queue_size); + max_queue_size_ = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt; -void ListenSyslog::runThread() { - while (_serverTheadRunning) { - if (_resetServerSocket) { - _resetServerSocket = false; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) { - close(_serverSocket); - _serverSocket = 0; - } - } + Protocol protocol = Protocol(ProtocolProperty.getDefaultValue()); + if (!context->getProperty(ProtocolProperty.getName(), protocol)) { + logger_->log_error("Missing or invalid Protocol: defaulting to %s", protocol.toString()); Review Comment: Good point. I checked and since these are required properties the context->getProperty will already throw if it can't find the property. So these branches are unreachable. I removed them in https://github.com/apache/nifi-minifi-cpp/pull/1294/commits/251865e006f1e53e45c33fdaa184a9259f7bf13d#diff-a22ae2c37869a184bd936ac719f98ae8bbdb1b8d2ad5b3e6fc48ea9207fe530bR95-R99 ########## extensions/standard-processors/processors/ListenSyslog.cpp: ########## @@ -17,318 +14,272 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + + #include "ListenSyslog.h" -#include <stdio.h> -#include <memory> -#include <string> -#include <vector> -#include <set> -#include <queue> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" -#include "core/TypedValues.h" #include "core/Resource.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { -#ifndef WIN32 -core::Property ListenSyslog::RecvBufSize( - core::PropertyBuilder::createProperty("Receive Buffer Size")->withDescription("The size of each buffer used to receive Syslog messages.")-> - withDefaultValue<core::DataSizeValue>("65507 B")->build()); - -core::Property ListenSyslog::MaxSocketBufSize( - core::PropertyBuilder::createProperty("Max Size of Socket Buffer")->withDescription("The maximum size of the socket buffer that should be used.")->withDefaultValue<core::DataSizeValue>("1 MB") - ->build()); +namespace org::apache::nifi::minifi::processors { -core::Property ListenSyslog::MaxConnections( - core::PropertyBuilder::createProperty("Max Number of TCP Connections")->withDescription("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") - ->withDefaultValue<int>(2)->build()); +const core::Property ListenSyslog::Port( + core::PropertyBuilder::createProperty("Listening Port") + ->withDescription("The port for Syslog communication.") + ->isRequired(true) + ->withDefaultValue<int>(514, core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build()); -core::Property ListenSyslog::MaxBatchSize( - core::PropertyBuilder::createProperty("Max Batch Size")->withDescription("The maximum number of Syslog events to add to a single FlowFile.")->withDefaultValue<int>(1)->build()); +const core::Property ListenSyslog::ProtocolProperty( + core::PropertyBuilder::createProperty("Protocol") + ->withDescription("The protocol for Syslog communication.") + ->isRequired(true) + ->withAllowableValues(Protocol::values()) + ->withDefaultValue(toString(Protocol::UDP)) + ->build()); -core::Property ListenSyslog::MessageDelimiter( - core::PropertyBuilder::createProperty("Message Delimiter")->withDescription("Specifies the delimiter to place between Syslog messages when multiple " - "messages are bundled together (see <Max Batch Size> core::Property).")->withDefaultValue("\n")->build()); +const core::Property ListenSyslog::MaxBatchSize( + core::PropertyBuilder::createProperty("Max Batch Size") + ->withDescription("The maximum number of Syslog events to process at a time.") + ->withDefaultValue<uint64_t>(500, std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 validator", 1)) + ->build()); -core::Property ListenSyslog::ParseMessages( - core::PropertyBuilder::createProperty("Parse Messages")->withDescription("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.") +const core::Property ListenSyslog::ParseMessages( + core::PropertyBuilder::createProperty("Parse Messages") + ->withDescription("Indicates if the processor should parse the Syslog messages. " + "If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.") ->withDefaultValue<bool>(false)->build()); -core::Property ListenSyslog::Protocol( - core::PropertyBuilder::createProperty("Protocol")->withDescription("The protocol for Syslog communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue( - "UDP")->build()); +const core::Property ListenSyslog::MaxQueueSize( + core::PropertyBuilder::createProperty("Max Size of Message Queue") + ->withDescription("Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. " + "If the buffer full, the message is ignored. If set to zero the buffer is unlimited.") + ->withDefaultValue<uint64_t>(0)->build()); + +const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. " + "When Parse Messages is set to false, all incoming message will be sent to this relationship."); +const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages that do not match the expected format when parsing will be sent to this relationship."); + -core::Property ListenSyslog::Port( - core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::get().PORT_VALIDATOR)->build()); +const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_( + R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)" // priority + R"((?:(\d{1,2}))\s)" // version + R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)" // timestamp + R"((?:([\S]{1,255}))\s)" // hostname + R"((?:([\S]{1,48}))\s)" // app_name + R"((?:([\S]{1,128}))\s)" // proc_id + R"((?:([\S]{1,32}))\s)" // msg_id + R"((?:(-|(?:\[.+?\])+))\s?)" // structured_data + R"((?:((?:.+)))?$)", std::regex::ECMAScript); // msg -core::Relationship ListenSyslog::Success("success", "All files are routed to success"); -core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); +const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_( + R"((?:\<(\d{1,3})\>))" // priority + R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)" // timestamp + R"(([\w][\w\d(\.|\:)@-]*)\s)" // hostname + R"((.*)$)", std::regex::ECMAScript); // msg void ListenSyslog::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(RecvBufSize); - properties.insert(MaxSocketBufSize); - properties.insert(MaxConnections); - properties.insert(MaxBatchSize); - properties.insert(MessageDelimiter); - properties.insert(ParseMessages); - properties.insert(Protocol); - properties.insert(Port); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - relationships.insert(Invalid); - setSupportedRelationships(relationships); + setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, MaxQueueSize}); + setSupportedRelationships({Success, Invalid}); } -void ListenSyslog::startSocketThread() { - if (_thread != NULL) - return; +void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context && !server_thread_.joinable() && !server_); - logger_->log_trace("ListenSysLog Socket Thread Start"); - _serverTheadRunning = true; - _thread = new std::thread(run, this); - _thread->detach(); -} + context->getProperty(MaxBatchSize.getName(), max_batch_size_); + context->getProperty(ParseMessages.getName(), parse_messages_); -void ListenSyslog::run(ListenSyslog *process) { - process->runThread(); -} + uint64_t max_queue_size = 0; + context->getProperty(MaxQueueSize.getName(), max_queue_size); + max_queue_size_ = max_queue_size > 0 ? std::optional<uint64_t>(max_queue_size) : std::nullopt; -void ListenSyslog::runThread() { - while (_serverTheadRunning) { - if (_resetServerSocket) { - _resetServerSocket = false; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) { - close(_serverSocket); - _serverSocket = 0; - } - } + Protocol protocol = Protocol(ProtocolProperty.getDefaultValue()); + if (!context->getProperty(ProtocolProperty.getName(), protocol)) { + logger_->log_error("Missing or invalid Protocol: defaulting to %s", protocol.toString()); + } - if (_serverSocket <= 0) { - uint16_t portno = _port; - struct sockaddr_in serv_addr; - int sockfd; - if (_protocol == "TCP") - sockfd = socket(AF_INET, SOCK_STREAM, 0); - else - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (sockfd < 0) { - logger_->log_error("ListenSysLog Server socket creation failed"); - break; - } - bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(portno); - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - logger_->log_error("ListenSysLog Server socket bind failed"); - break; - } - if (_protocol == "TCP") - listen(sockfd, 5); - _serverSocket = sockfd; - logger_->log_info("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); - } - FD_ZERO(&_readfds); - FD_SET(_serverSocket, &_readfds); - _maxFds = _serverSocket; - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { - int clientSocket = *it; - if (clientSocket >= _maxFds) - _maxFds = clientSocket; - FD_SET(clientSocket, &_readfds); - } - fd_set fds; - struct timeval tv; - int retval; - fds = _readfds; - tv.tv_sec = 0; - // 100 msec - tv.tv_usec = 100000; - retval = select(_maxFds + 1, &fds, NULL, NULL, &tv); - if (retval < 0) + int port = Port.getDefaultValue(); + if (!context->getProperty(Port.getName(), port)) { + logger_->log_error("Missing or invalid Port: defaulting to %s", port); Review Comment: Good point. I checked and since these are required properties the context->getProperty will already throw if it can't find the property. So these branches are unreachable. I removed them in https://github.com/apache/nifi-minifi-cpp/pull/1294/commits/251865e006f1e53e45c33fdaa184a9259f7bf13d#diff-a22ae2c37869a184bd936ac719f98ae8bbdb1b8d2ad5b3e6fc48ea9207fe530bR95-R99 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org