martinzink commented on code in PR #1294:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1294#discussion_r849460551


##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -17,318 +14,272 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 #include "ListenSyslog.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <queue>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/TypedValues.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
-core::Property ListenSyslog::RecvBufSize(
-    core::PropertyBuilder::createProperty("Receive Buffer 
Size")->withDescription("The size of each buffer used to receive Syslog 
messages.")->
-    withDefaultValue<core::DataSizeValue>("65507 B")->build());
-
-core::Property ListenSyslog::MaxSocketBufSize(
-    core::PropertyBuilder::createProperty("Max Size of Socket 
Buffer")->withDescription("The maximum size of the socket buffer that should be 
used.")->withDefaultValue<core::DataSizeValue>("1 MB")
-        ->build());
+namespace org::apache::nifi::minifi::processors {
 
-core::Property ListenSyslog::MaxConnections(
-    core::PropertyBuilder::createProperty("Max Number of TCP 
Connections")->withDescription("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
-        ->withDefaultValue<int>(2)->build());
+const core::Property ListenSyslog::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port for Syslog communication.")
+        ->isRequired(true)
+        ->withDefaultValue<int>(514, 
core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
-core::Property ListenSyslog::MaxBatchSize(
-    core::PropertyBuilder::createProperty("Max Batch 
Size")->withDescription("The maximum number of Syslog events to add to a single 
FlowFile.")->withDefaultValue<int>(1)->build());
+const core::Property ListenSyslog::ProtocolProperty(
+    core::PropertyBuilder::createProperty("Protocol")
+        ->withDescription("The protocol for Syslog communication.")
+        ->isRequired(true)
+        ->withAllowableValues(Protocol::values())
+        ->withDefaultValue(toString(Protocol::UDP))
+        ->build());
 
-core::Property ListenSyslog::MessageDelimiter(
-    core::PropertyBuilder::createProperty("Message 
Delimiter")->withDescription("Specifies the delimiter to place between Syslog 
messages when multiple "
-                                                                               
 "messages are bundled together (see <Max Batch Size> 
core::Property).")->withDefaultValue("\n")->build());
+const core::Property ListenSyslog::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of Syslog events to process at a 
time.")
+        ->withDefaultValue<uint64_t>(500, 
std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 
validator", 1))
+        ->build());
 
-core::Property ListenSyslog::ParseMessages(
-    core::PropertyBuilder::createProperty("Parse 
Messages")->withDescription("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only.")
+const core::Property ListenSyslog::ParseMessages(
+    core::PropertyBuilder::createProperty("Parse Messages")
+        ->withDescription("Indicates if the processor should parse the Syslog 
messages. "
+                          "If set to false, each outgoing FlowFile will only 
contain the sender, protocol, and port, and no additional attributes.")
         ->withDefaultValue<bool>(false)->build());
 
-core::Property ListenSyslog::Protocol(
-    core::PropertyBuilder::createProperty("Protocol")->withDescription("The 
protocol for Syslog 
communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue(
-        "UDP")->build());
+const core::Property ListenSyslog::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of Syslog messages allowed to be 
buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the message is ignored. If set 
to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(0)->build());
+
+const core::Relationship ListenSyslog::Success("success", "Incoming messages 
that match the expected format when parsing will be sent to this relationship. "
+                                                          "When Parse Messages 
is set to false, all incoming message will be sent to this relationship.");
+const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages 
that do not match the expected format when parsing will be sent to this 
relationship.");
+
 
-core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port 
for Syslog communication")->withDefaultValue<int64_t>(514, 
core::StandardValidators::get().PORT_VALIDATOR)->build());
+const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+    R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                     
                               // priority
+    R"((?:(\d{1,2}))\s)"                                                       
                               // version
+    
R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"
        // timestamp
+    R"((?:([\S]{1,255}))\s)"                                                   
                               // hostname
+    R"((?:([\S]{1,48}))\s)"                                                    
                               // app_name
+    R"((?:([\S]{1,128}))\s)"                                                   
                               // proc_id
+    R"((?:([\S]{1,32}))\s)"                                                    
                               // msg_id
+    R"((?:(-|(?:\[.+?\])+))\s?)"                                               
                               // structured_data
+    R"((?:((?:.+)))?$)", std::regex::ECMAScript);                              
                               // msg
 
-core::Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format 
invalid");
+const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+    R"((?:\<(\d{1,3})\>))"                                                     
                               // priority
+    R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"              
                               // timestamp
+    R"(([\w][\w\d(\.|\:)@-]*)\s)"                                              
                               // hostname
+    R"((.*)$)", std::regex::ECMAScript);                                       
                               // msg
 
 void ListenSyslog::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(RecvBufSize);
-  properties.insert(MaxSocketBufSize);
-  properties.insert(MaxConnections);
-  properties.insert(MaxBatchSize);
-  properties.insert(MessageDelimiter);
-  properties.insert(ParseMessages);
-  properties.insert(Protocol);
-  properties.insert(Port);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Invalid);
-  setSupportedRelationships(relationships);
+  setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, 
MaxQueueSize});
+  setSupportedRelationships({Success, Invalid});
 }
 
-void ListenSyslog::startSocketThread() {
-  if (_thread != NULL)
-    return;
+void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context && !server_thread_.joinable() && !server_);
 
-  logger_->log_trace("ListenSysLog Socket Thread Start");
-  _serverTheadRunning = true;
-  _thread = new std::thread(run, this);
-  _thread->detach();
-}
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  context->getProperty(ParseMessages.getName(), parse_messages_);
 
-void ListenSyslog::run(ListenSyslog *process) {
-  process->runThread();
-}
+  uint64_t max_queue_size = 0;
+  context->getProperty(MaxQueueSize.getName(), max_queue_size);
+  max_queue_size_ = max_queue_size > 0 ? 
std::optional<uint64_t>(max_queue_size) : std::nullopt;
 
-void ListenSyslog::runThread() {
-  while (_serverTheadRunning) {
-    if (_resetServerSocket) {
-      _resetServerSocket = false;
-      // need to reset the socket
-      std::vector<int>::iterator it;
-      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-        int clientSocket = *it;
-        close(clientSocket);
-      }
-      _clientSockets.clear();
-      if (_serverSocket > 0) {
-        close(_serverSocket);
-        _serverSocket = 0;
-      }
-    }
+  Protocol protocol = Protocol(ProtocolProperty.getDefaultValue());
+  if (!context->getProperty(ProtocolProperty.getName(), protocol)) {
+    logger_->log_error("Missing or invalid Protocol: defaulting to %s", 
protocol.toString());

Review Comment:
   Good point. I checked and since these are required properties the 
context->getProperty will already throw if it can't find the property.
   So these branches are unreachable. I removed them in 
https://github.com/apache/nifi-minifi-cpp/pull/1294/commits/251865e006f1e53e45c33fdaa184a9259f7bf13d#diff-a22ae2c37869a184bd936ac719f98ae8bbdb1b8d2ad5b3e6fc48ea9207fe530bR95-R99



##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -17,318 +14,272 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 #include "ListenSyslog.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <queue>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/TypedValues.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
-core::Property ListenSyslog::RecvBufSize(
-    core::PropertyBuilder::createProperty("Receive Buffer 
Size")->withDescription("The size of each buffer used to receive Syslog 
messages.")->
-    withDefaultValue<core::DataSizeValue>("65507 B")->build());
-
-core::Property ListenSyslog::MaxSocketBufSize(
-    core::PropertyBuilder::createProperty("Max Size of Socket 
Buffer")->withDescription("The maximum size of the socket buffer that should be 
used.")->withDefaultValue<core::DataSizeValue>("1 MB")
-        ->build());
+namespace org::apache::nifi::minifi::processors {
 
-core::Property ListenSyslog::MaxConnections(
-    core::PropertyBuilder::createProperty("Max Number of TCP 
Connections")->withDescription("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
-        ->withDefaultValue<int>(2)->build());
+const core::Property ListenSyslog::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port for Syslog communication.")
+        ->isRequired(true)
+        ->withDefaultValue<int>(514, 
core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
-core::Property ListenSyslog::MaxBatchSize(
-    core::PropertyBuilder::createProperty("Max Batch 
Size")->withDescription("The maximum number of Syslog events to add to a single 
FlowFile.")->withDefaultValue<int>(1)->build());
+const core::Property ListenSyslog::ProtocolProperty(
+    core::PropertyBuilder::createProperty("Protocol")
+        ->withDescription("The protocol for Syslog communication.")
+        ->isRequired(true)
+        ->withAllowableValues(Protocol::values())
+        ->withDefaultValue(toString(Protocol::UDP))
+        ->build());
 
-core::Property ListenSyslog::MessageDelimiter(
-    core::PropertyBuilder::createProperty("Message 
Delimiter")->withDescription("Specifies the delimiter to place between Syslog 
messages when multiple "
-                                                                               
 "messages are bundled together (see <Max Batch Size> 
core::Property).")->withDefaultValue("\n")->build());
+const core::Property ListenSyslog::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of Syslog events to process at a 
time.")
+        ->withDefaultValue<uint64_t>(500, 
std::make_shared<core::UnsignedLongValidator>("Greater or equal than 1 
validator", 1))
+        ->build());
 
-core::Property ListenSyslog::ParseMessages(
-    core::PropertyBuilder::createProperty("Parse 
Messages")->withDescription("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only.")
+const core::Property ListenSyslog::ParseMessages(
+    core::PropertyBuilder::createProperty("Parse Messages")
+        ->withDescription("Indicates if the processor should parse the Syslog 
messages. "
+                          "If set to false, each outgoing FlowFile will only 
contain the sender, protocol, and port, and no additional attributes.")
         ->withDefaultValue<bool>(false)->build());
 
-core::Property ListenSyslog::Protocol(
-    core::PropertyBuilder::createProperty("Protocol")->withDescription("The 
protocol for Syslog 
communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue(
-        "UDP")->build());
+const core::Property ListenSyslog::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of Syslog messages allowed to be 
buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the message is ignored. If set 
to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(0)->build());
+
+const core::Relationship ListenSyslog::Success("success", "Incoming messages 
that match the expected format when parsing will be sent to this relationship. "
+                                                          "When Parse Messages 
is set to false, all incoming message will be sent to this relationship.");
+const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages 
that do not match the expected format when parsing will be sent to this 
relationship.");
+
 
-core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port 
for Syslog communication")->withDefaultValue<int64_t>(514, 
core::StandardValidators::get().PORT_VALIDATOR)->build());
+const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+    R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                     
                               // priority
+    R"((?:(\d{1,2}))\s)"                                                       
                               // version
+    
R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"
        // timestamp
+    R"((?:([\S]{1,255}))\s)"                                                   
                               // hostname
+    R"((?:([\S]{1,48}))\s)"                                                    
                               // app_name
+    R"((?:([\S]{1,128}))\s)"                                                   
                               // proc_id
+    R"((?:([\S]{1,32}))\s)"                                                    
                               // msg_id
+    R"((?:(-|(?:\[.+?\])+))\s?)"                                               
                               // structured_data
+    R"((?:((?:.+)))?$)", std::regex::ECMAScript);                              
                               // msg
 
-core::Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format 
invalid");
+const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+    R"((?:\<(\d{1,3})\>))"                                                     
                               // priority
+    R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"              
                               // timestamp
+    R"(([\w][\w\d(\.|\:)@-]*)\s)"                                              
                               // hostname
+    R"((.*)$)", std::regex::ECMAScript);                                       
                               // msg
 
 void ListenSyslog::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(RecvBufSize);
-  properties.insert(MaxSocketBufSize);
-  properties.insert(MaxConnections);
-  properties.insert(MaxBatchSize);
-  properties.insert(MessageDelimiter);
-  properties.insert(ParseMessages);
-  properties.insert(Protocol);
-  properties.insert(Port);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Invalid);
-  setSupportedRelationships(relationships);
+  setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, 
MaxQueueSize});
+  setSupportedRelationships({Success, Invalid});
 }
 
-void ListenSyslog::startSocketThread() {
-  if (_thread != NULL)
-    return;
+void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context && !server_thread_.joinable() && !server_);
 
-  logger_->log_trace("ListenSysLog Socket Thread Start");
-  _serverTheadRunning = true;
-  _thread = new std::thread(run, this);
-  _thread->detach();
-}
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  context->getProperty(ParseMessages.getName(), parse_messages_);
 
-void ListenSyslog::run(ListenSyslog *process) {
-  process->runThread();
-}
+  uint64_t max_queue_size = 0;
+  context->getProperty(MaxQueueSize.getName(), max_queue_size);
+  max_queue_size_ = max_queue_size > 0 ? 
std::optional<uint64_t>(max_queue_size) : std::nullopt;
 
-void ListenSyslog::runThread() {
-  while (_serverTheadRunning) {
-    if (_resetServerSocket) {
-      _resetServerSocket = false;
-      // need to reset the socket
-      std::vector<int>::iterator it;
-      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-        int clientSocket = *it;
-        close(clientSocket);
-      }
-      _clientSockets.clear();
-      if (_serverSocket > 0) {
-        close(_serverSocket);
-        _serverSocket = 0;
-      }
-    }
+  Protocol protocol = Protocol(ProtocolProperty.getDefaultValue());
+  if (!context->getProperty(ProtocolProperty.getName(), protocol)) {
+    logger_->log_error("Missing or invalid Protocol: defaulting to %s", 
protocol.toString());
+  }
 
-    if (_serverSocket <= 0) {
-      uint16_t portno = _port;
-      struct sockaddr_in serv_addr;
-      int sockfd;
-      if (_protocol == "TCP")
-        sockfd = socket(AF_INET, SOCK_STREAM, 0);
-      else
-        sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-      if (sockfd < 0) {
-        logger_->log_error("ListenSysLog Server socket creation failed");
-        break;
-      }
-      bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr));
-      serv_addr.sin_family = AF_INET;
-      serv_addr.sin_addr.s_addr = INADDR_ANY;
-      serv_addr.sin_port = htons(portno);
-      if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) 
{
-        logger_->log_error("ListenSysLog Server socket bind failed");
-        break;
-      }
-      if (_protocol == "TCP")
-        listen(sockfd, 5);
-      _serverSocket = sockfd;
-      logger_->log_info("ListenSysLog Server socket %d bind OK to port %d", 
_serverSocket, portno);
-    }
-    FD_ZERO(&_readfds);
-    FD_SET(_serverSocket, &_readfds);
-    _maxFds = _serverSocket;
-    std::vector<int>::iterator it;
-    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-      int clientSocket = *it;
-      if (clientSocket >= _maxFds)
-        _maxFds = clientSocket;
-      FD_SET(clientSocket, &_readfds);
-    }
-    fd_set fds;
-    struct timeval tv;
-    int retval;
-    fds = _readfds;
-    tv.tv_sec = 0;
-    // 100 msec
-    tv.tv_usec = 100000;
-    retval = select(_maxFds + 1, &fds, NULL, NULL, &tv);
-    if (retval < 0)
+  int port = Port.getDefaultValue();
+  if (!context->getProperty(Port.getName(), port)) {
+    logger_->log_error("Missing or invalid Port: defaulting to %s", port);

Review Comment:
   Good point. I checked and since these are required properties the 
context->getProperty will already throw if it can't find the property.
   So these branches are unreachable. I removed them in 
https://github.com/apache/nifi-minifi-cpp/pull/1294/commits/251865e006f1e53e45c33fdaa184a9259f7bf13d#diff-a22ae2c37869a184bd936ac719f98ae8bbdb1b8d2ad5b3e6fc48ea9207fe530bR95-R99



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to