martinzink commented on code in PR #1294: URL: https://github.com/apache/nifi-minifi-cpp/pull/1294#discussion_r844844652
########## extensions/standard-processors/processors/ListenSyslog.cpp: ########## @@ -17,318 +14,283 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + + #include "ListenSyslog.h" -#include <stdio.h> -#include <memory> -#include <string> -#include <vector> -#include <set> -#include <queue> -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" -#include "core/TypedValues.h" #include "core/Resource.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { -#ifndef WIN32 -core::Property ListenSyslog::RecvBufSize( - core::PropertyBuilder::createProperty("Receive Buffer Size")->withDescription("The size of each buffer used to receive Syslog messages.")-> - withDefaultValue<core::DataSizeValue>("65507 B")->build()); - -core::Property ListenSyslog::MaxSocketBufSize( - core::PropertyBuilder::createProperty("Max Size of Socket Buffer")->withDescription("The maximum size of the socket buffer that should be used.")->withDefaultValue<core::DataSizeValue>("1 MB") - ->build()); +namespace org::apache::nifi::minifi::processors { -core::Property ListenSyslog::MaxConnections( - core::PropertyBuilder::createProperty("Max Number of TCP Connections")->withDescription("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") - ->withDefaultValue<int>(2)->build()); +const core::Property ListenSyslog::Port( + core::PropertyBuilder::createProperty("Listening Port") + ->withDescription("The port for Syslog communication.") + ->isRequired(true) + ->withDefaultValue<int>(514, core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build()); -core::Property ListenSyslog::MaxBatchSize( - core::PropertyBuilder::createProperty("Max Batch Size")->withDescription("The maximum number of Syslog events to add to a single FlowFile.")->withDefaultValue<int>(1)->build()); +const core::Property ListenSyslog::ProtocolProperty( + core::PropertyBuilder::createProperty("Protocol") + ->withDescription("The protocol for Syslog communication.") + ->isRequired(true) + ->withAllowableValues(Protocol::values()) + ->withDefaultValue(toString(Protocol::UDP)) + ->build()); -core::Property ListenSyslog::MessageDelimiter( - core::PropertyBuilder::createProperty("Message Delimiter")->withDescription("Specifies the delimiter to place between Syslog messages when multiple " - "messages are bundled together (see <Max Batch Size> core::Property).")->withDefaultValue("\n")->build()); +const core::Property ListenSyslog::MaxBatchSize( + core::PropertyBuilder::createProperty("Max Batch Size") + ->withDescription("The maximum number of Syslog events to process at a time.") + ->withDefaultValue<uint64_t>(500) + ->build()); -core::Property ListenSyslog::ParseMessages( - core::PropertyBuilder::createProperty("Parse Messages")->withDescription("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.") +const core::Property ListenSyslog::ParseMessages( + core::PropertyBuilder::createProperty("Parse Messages") + ->withDescription("Indicates if the processor should parse the Syslog messages. " + "If set to false, each outgoing FlowFile will only contain the sender, protocol, and port, and no additional attributes.") ->withDefaultValue<bool>(false)->build()); -core::Property ListenSyslog::Protocol( - core::PropertyBuilder::createProperty("Protocol")->withDescription("The protocol for Syslog communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue( - "UDP")->build()); +const core::Property ListenSyslog::MaxQueueSize( + core::PropertyBuilder::createProperty("Max Size of Message Queue") + ->withDescription("Maximum number of Syslog messages allowed to be buffered before processing them when the processor is triggered. " + "If the buffer full, the message is ignored. If set to zero the buffer is unlimited.") + ->withDefaultValue<uint64_t>(0)->build()); -core::Property ListenSyslog::Port( - core::PropertyBuilder::createProperty("Port")->withDescription("The port for Syslog communication")->withDefaultValue<int64_t>(514, core::StandardValidators::get().PORT_VALIDATOR)->build()); +const core::Relationship ListenSyslog::Success("success", "Incoming messages that match the expected format when parsing will be sent to this relationship. " + "When Parse Messages is set to false, all incoming message will be sent to this relationship."); +const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages that do not match the expected format when parsing will be sent to this relationship."); -core::Relationship ListenSyslog::Success("success", "All files are routed to success"); -core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); - -void ListenSyslog::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(RecvBufSize); - properties.insert(MaxSocketBufSize); - properties.insert(MaxConnections); - properties.insert(MaxBatchSize); - properties.insert(MessageDelimiter); - properties.insert(ParseMessages); - properties.insert(Protocol); - properties.insert(Port); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - relationships.insert(Invalid); - setSupportedRelationships(relationships); -} -void ListenSyslog::startSocketThread() { - if (_thread != NULL) - return; +const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_( + R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)" // priority + R"((?:(\d{1,2}))\s)" // version + R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)" // timestamp + R"((?:([\S]{1,255}))\s)" // hostname + R"((?:([\S]{1,48}))\s)" // app_name + R"((?:([\S]{1,128}))\s)" // proc_id + R"((?:([\S]{1,32}))\s)" // msg_id + R"((?:(-|(?:\[.+?\])+))\s?)" // structured_data + R"((?:((?:.+)))?$)", std::regex::ECMAScript); // msg - logger_->log_trace("ListenSysLog Socket Thread Start"); - _serverTheadRunning = true; - _thread = new std::thread(run, this); - _thread->detach(); -} +const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_( + R"((?:\<(\d{1,3})\>))" // priority + R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)" // timestamp + R"(([\w][\w\d(\.|\:)@-]*)\s)" // hostname + R"((.*)$)", std::regex::ECMAScript); // msg -void ListenSyslog::run(ListenSyslog *process) { - process->runThread(); +void ListenSyslog::initialize() { + setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, MaxQueueSize}); + setSupportedRelationships({Success, Invalid}); } -void ListenSyslog::runThread() { - while (_serverTheadRunning) { - if (_resetServerSocket) { - _resetServerSocket = false; - // need to reset the socket - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { - int clientSocket = *it; - close(clientSocket); - } - _clientSockets.clear(); - if (_serverSocket > 0) { - close(_serverSocket); - _serverSocket = 0; - } - } +void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) { + gsl_Expects(context && !server_thread_.joinable() && !server_); - if (_serverSocket <= 0) { - uint16_t portno = _port; - struct sockaddr_in serv_addr; - int sockfd; - if (_protocol == "TCP") - sockfd = socket(AF_INET, SOCK_STREAM, 0); - else - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (sockfd < 0) { - logger_->log_error("ListenSysLog Server socket creation failed"); - break; - } - bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; - serv_addr.sin_port = htons(portno); - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - logger_->log_error("ListenSysLog Server socket bind failed"); - break; - } - if (_protocol == "TCP") - listen(sockfd, 5); - _serverSocket = sockfd; - logger_->log_info("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); - } - FD_ZERO(&_readfds); - FD_SET(_serverSocket, &_readfds); - _maxFds = _serverSocket; - std::vector<int>::iterator it; - for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) { - int clientSocket = *it; - if (clientSocket >= _maxFds) - _maxFds = clientSocket; - FD_SET(clientSocket, &_readfds); - } - fd_set fds; - struct timeval tv; - int retval; - fds = _readfds; - tv.tv_sec = 0; - // 100 msec - tv.tv_usec = 100000; - retval = select(_maxFds + 1, &fds, NULL, NULL, &tv); - if (retval < 0) - break; - if (retval == 0) - continue; - if (FD_ISSET(_serverSocket, &fds)) { - // server socket, either we have UDP datagram or TCP connection request - if (_protocol == "TCP") { - socklen_t clilen; - struct sockaddr_in cli_addr; - clilen = sizeof(cli_addr); - int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen); - if (newsockfd > 0) { - if (_clientSockets.size() < (uint64_t) _maxConnections) { - _clientSockets.push_back(newsockfd); - logger_->log_info("ListenSysLog new client socket %d connection", newsockfd); - continue; - } else { - close(newsockfd); - } - } - } else { - socklen_t clilen; - struct sockaddr_in cli_addr; - clilen = sizeof(cli_addr); - int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, (struct sockaddr *) &cli_addr, &clilen); - if (recvlen > 0 && (uint64_t) (recvlen + getEventQueueByteSize()) <= static_cast<uint64_t>(_recvBufSize)) { - uint8_t *payload = new uint8_t[recvlen]; - memcpy(payload, _buffer, recvlen); - putEvent(payload, recvlen); - } - } - } - it = _clientSockets.begin(); - while (it != _clientSockets.end()) { - int clientSocket = *it; - if (FD_ISSET(clientSocket, &fds)) { - int recvlen = readline(clientSocket, _buffer, sizeof(_buffer)); - if (recvlen <= 0) { - close(clientSocket); - logger_->log_debug("ListenSysLog client socket %d close", clientSocket); - it = _clientSockets.erase(it); - } else { - if ((uint64_t) (recvlen + getEventQueueByteSize()) <= static_cast<uint64_t>(_recvBufSize)) { - uint8_t *payload = new uint8_t[recvlen]; - memcpy(payload, _buffer, recvlen); - putEvent(payload, recvlen); - } - ++it; - } - } - } + if (!context->getProperty(MaxBatchSize.getName(), max_batch_size_) || max_batch_size_ <= 0) { + max_batch_size_ = MaxBatchSize.getDefaultValue(); + logger_->log_debug("Missing or invalid attribute %s", MaxBatchSize.getName()); } - return; -} -int ListenSyslog::readline(int fd, char *bufptr, size_t len) { - char *bufx = bufptr; - static char *bp; - static int cnt = 0; - static char b[2048]; - char c; - - while (--len > 0) { - if (--cnt <= 0) { - cnt = recv(fd, b, sizeof(b), 0); - if (cnt < 0) { - if (errno == EINTR) { - len++; /* the while will decrement */ - continue; - } - return -1; - } - if (cnt == 0) - return 0; - bp = b; - } - c = *bp++; - *bufptr++ = c; - if (c == '\n') { - *bufptr = '\n'; - return bufptr - bufx + 1; - } + if (!context->getProperty(ParseMessages.getName(), parse_messages_)) { + parse_messages_ = ParseMessages.getDefaultValue(); Review Comment: Didnt know that, but you are right. Changed it in https://github.com/apache/nifi-minifi-cpp/pull/1294/commits/027fea1d65bd423940edd81d598cef68946e2202#diff-a22ae2c37869a184bd936ac719f98ae8bbdb1b8d2ad5b3e6fc48ea9207fe530bR88-R93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org