martinzink commented on code in PR #1294:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1294#discussion_r844844652


##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -17,318 +14,283 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+
 #include "ListenSyslog.h"
-#include <stdio.h>
-#include <memory>
-#include <string>
-#include <vector>
-#include <set>
-#include <queue>
-#include "utils/TimeUtil.h"
-#include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "core/TypedValues.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-#ifndef WIN32
-core::Property ListenSyslog::RecvBufSize(
-    core::PropertyBuilder::createProperty("Receive Buffer 
Size")->withDescription("The size of each buffer used to receive Syslog 
messages.")->
-    withDefaultValue<core::DataSizeValue>("65507 B")->build());
-
-core::Property ListenSyslog::MaxSocketBufSize(
-    core::PropertyBuilder::createProperty("Max Size of Socket 
Buffer")->withDescription("The maximum size of the socket buffer that should be 
used.")->withDefaultValue<core::DataSizeValue>("1 MB")
-        ->build());
+namespace org::apache::nifi::minifi::processors {
 
-core::Property ListenSyslog::MaxConnections(
-    core::PropertyBuilder::createProperty("Max Number of TCP 
Connections")->withDescription("The maximum number of concurrent connections to 
accept Syslog messages in TCP mode.")
-        ->withDefaultValue<int>(2)->build());
+const core::Property ListenSyslog::Port(
+    core::PropertyBuilder::createProperty("Listening Port")
+        ->withDescription("The port for Syslog communication.")
+        ->isRequired(true)
+        ->withDefaultValue<int>(514, 
core::StandardValidators::get().LISTEN_PORT_VALIDATOR)->build());
 
-core::Property ListenSyslog::MaxBatchSize(
-    core::PropertyBuilder::createProperty("Max Batch 
Size")->withDescription("The maximum number of Syslog events to add to a single 
FlowFile.")->withDefaultValue<int>(1)->build());
+const core::Property ListenSyslog::ProtocolProperty(
+    core::PropertyBuilder::createProperty("Protocol")
+        ->withDescription("The protocol for Syslog communication.")
+        ->isRequired(true)
+        ->withAllowableValues(Protocol::values())
+        ->withDefaultValue(toString(Protocol::UDP))
+        ->build());
 
-core::Property ListenSyslog::MessageDelimiter(
-    core::PropertyBuilder::createProperty("Message 
Delimiter")->withDescription("Specifies the delimiter to place between Syslog 
messages when multiple "
-                                                                               
 "messages are bundled together (see <Max Batch Size> 
core::Property).")->withDefaultValue("\n")->build());
+const core::Property ListenSyslog::MaxBatchSize(
+    core::PropertyBuilder::createProperty("Max Batch Size")
+        ->withDescription("The maximum number of Syslog events to process at a 
time.")
+        ->withDefaultValue<uint64_t>(500)
+        ->build());
 
-core::Property ListenSyslog::ParseMessages(
-    core::PropertyBuilder::createProperty("Parse 
Messages")->withDescription("Indicates if the processor should parse the Syslog 
messages. If set to false, each outgoing FlowFile will only.")
+const core::Property ListenSyslog::ParseMessages(
+    core::PropertyBuilder::createProperty("Parse Messages")
+        ->withDescription("Indicates if the processor should parse the Syslog 
messages. "
+                          "If set to false, each outgoing FlowFile will only 
contain the sender, protocol, and port, and no additional attributes.")
         ->withDefaultValue<bool>(false)->build());
 
-core::Property ListenSyslog::Protocol(
-    core::PropertyBuilder::createProperty("Protocol")->withDescription("The 
protocol for Syslog 
communication.")->withAllowableValue<std::string>("UDP")->withAllowableValue("TCP")->withDefaultValue(
-        "UDP")->build());
+const core::Property ListenSyslog::MaxQueueSize(
+    core::PropertyBuilder::createProperty("Max Size of Message Queue")
+        ->withDescription("Maximum number of Syslog messages allowed to be 
buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the message is ignored. If set 
to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(0)->build());
 
-core::Property ListenSyslog::Port(
-    core::PropertyBuilder::createProperty("Port")->withDescription("The port 
for Syslog communication")->withDefaultValue<int64_t>(514, 
core::StandardValidators::get().PORT_VALIDATOR)->build());
+const core::Relationship ListenSyslog::Success("success", "Incoming messages 
that match the expected format when parsing will be sent to this relationship. "
+                                                          "When Parse Messages 
is set to false, all incoming message will be sent to this relationship.");
+const core::Relationship ListenSyslog::Invalid("invalid", "Incoming messages 
that do not match the expected format when parsing will be sent to this 
relationship.");
 
-core::Relationship ListenSyslog::Success("success", "All files are routed to 
success");
-core::Relationship ListenSyslog::Invalid("invalid", "SysLog message format 
invalid");
-
-void ListenSyslog::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(RecvBufSize);
-  properties.insert(MaxSocketBufSize);
-  properties.insert(MaxConnections);
-  properties.insert(MaxBatchSize);
-  properties.insert(MessageDelimiter);
-  properties.insert(ParseMessages);
-  properties.insert(Protocol);
-  properties.insert(Port);
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  relationships.insert(Invalid);
-  setSupportedRelationships(relationships);
-}
 
-void ListenSyslog::startSocketThread() {
-  if (_thread != NULL)
-    return;
+const std::regex ListenSyslog::SyslogMessage::rfc5424_pattern_(
+    R"(^<(?:(\d|\d{2}|1[1-8]\d|19[01]))>)"                                     
                               // priority
+    R"((?:(\d{1,2}))\s)"                                                       
                               // version
+    
R"((?:(\d{4}[-]\d{2}[-]\d{2}[T]\d{2}[:]\d{2}[:]\d{2}(?:\.\d{1,6})?(?:[+-]\d{2}[:]\d{2}|Z)?)|-)\s)"
        // timestamp
+    R"((?:([\S]{1,255}))\s)"                                                   
                               // hostname
+    R"((?:([\S]{1,48}))\s)"                                                    
                               // app_name
+    R"((?:([\S]{1,128}))\s)"                                                   
                               // proc_id
+    R"((?:([\S]{1,32}))\s)"                                                    
                               // msg_id
+    R"((?:(-|(?:\[.+?\])+))\s?)"                                               
                               // structured_data
+    R"((?:((?:.+)))?$)", std::regex::ECMAScript);                              
                               // msg
 
-  logger_->log_trace("ListenSysLog Socket Thread Start");
-  _serverTheadRunning = true;
-  _thread = new std::thread(run, this);
-  _thread->detach();
-}
+const std::regex ListenSyslog::SyslogMessage::rfc3164_pattern_(
+    R"((?:\<(\d{1,3})\>))"                                                     
                               // priority
+    R"(([A-Z][a-z][a-z]\s{1,2}\d{1,2}\s\d{2}[:]\d{2}[:]\d{2})\s)"              
                               // timestamp
+    R"(([\w][\w\d(\.|\:)@-]*)\s)"                                              
                               // hostname
+    R"((.*)$)", std::regex::ECMAScript);                                       
                               // msg
 
-void ListenSyslog::run(ListenSyslog *process) {
-  process->runThread();
+void ListenSyslog::initialize() {
+  setSupportedProperties({Port, ProtocolProperty, MaxBatchSize, ParseMessages, 
MaxQueueSize});
+  setSupportedRelationships({Success, Invalid});
 }
 
-void ListenSyslog::runThread() {
-  while (_serverTheadRunning) {
-    if (_resetServerSocket) {
-      _resetServerSocket = false;
-      // need to reset the socket
-      std::vector<int>::iterator it;
-      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-        int clientSocket = *it;
-        close(clientSocket);
-      }
-      _clientSockets.clear();
-      if (_serverSocket > 0) {
-        close(_serverSocket);
-        _serverSocket = 0;
-      }
-    }
+void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context && !server_thread_.joinable() && !server_);
 
-    if (_serverSocket <= 0) {
-      uint16_t portno = _port;
-      struct sockaddr_in serv_addr;
-      int sockfd;
-      if (_protocol == "TCP")
-        sockfd = socket(AF_INET, SOCK_STREAM, 0);
-      else
-        sockfd = socket(AF_INET, SOCK_DGRAM, 0);
-      if (sockfd < 0) {
-        logger_->log_error("ListenSysLog Server socket creation failed");
-        break;
-      }
-      bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr));
-      serv_addr.sin_family = AF_INET;
-      serv_addr.sin_addr.s_addr = INADDR_ANY;
-      serv_addr.sin_port = htons(portno);
-      if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) 
{
-        logger_->log_error("ListenSysLog Server socket bind failed");
-        break;
-      }
-      if (_protocol == "TCP")
-        listen(sockfd, 5);
-      _serverSocket = sockfd;
-      logger_->log_info("ListenSysLog Server socket %d bind OK to port %d", 
_serverSocket, portno);
-    }
-    FD_ZERO(&_readfds);
-    FD_SET(_serverSocket, &_readfds);
-    _maxFds = _serverSocket;
-    std::vector<int>::iterator it;
-    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
-      int clientSocket = *it;
-      if (clientSocket >= _maxFds)
-        _maxFds = clientSocket;
-      FD_SET(clientSocket, &_readfds);
-    }
-    fd_set fds;
-    struct timeval tv;
-    int retval;
-    fds = _readfds;
-    tv.tv_sec = 0;
-    // 100 msec
-    tv.tv_usec = 100000;
-    retval = select(_maxFds + 1, &fds, NULL, NULL, &tv);
-    if (retval < 0)
-      break;
-    if (retval == 0)
-      continue;
-    if (FD_ISSET(_serverSocket, &fds)) {
-      // server socket, either we have UDP datagram or TCP connection request
-      if (_protocol == "TCP") {
-        socklen_t clilen;
-        struct sockaddr_in cli_addr;
-        clilen = sizeof(cli_addr);
-        int newsockfd = accept(_serverSocket, reinterpret_cast<struct sockaddr 
*>(&cli_addr), &clilen);
-        if (newsockfd > 0) {
-          if (_clientSockets.size() < (uint64_t) _maxConnections) {
-            _clientSockets.push_back(newsockfd);
-            logger_->log_info("ListenSysLog new client socket %d connection", 
newsockfd);
-            continue;
-          } else {
-            close(newsockfd);
-          }
-        }
-      } else {
-        socklen_t clilen;
-        struct sockaddr_in cli_addr;
-        clilen = sizeof(cli_addr);
-        int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, 
(struct sockaddr *) &cli_addr, &clilen);
-        if (recvlen > 0 && (uint64_t) (recvlen + getEventQueueByteSize()) <= 
static_cast<uint64_t>(_recvBufSize)) {
-          uint8_t *payload = new uint8_t[recvlen];
-          memcpy(payload, _buffer, recvlen);
-          putEvent(payload, recvlen);
-        }
-      }
-    }
-    it = _clientSockets.begin();
-    while (it != _clientSockets.end()) {
-      int clientSocket = *it;
-      if (FD_ISSET(clientSocket, &fds)) {
-        int recvlen = readline(clientSocket, _buffer, sizeof(_buffer));
-        if (recvlen <= 0) {
-          close(clientSocket);
-          logger_->log_debug("ListenSysLog client socket %d close", 
clientSocket);
-          it = _clientSockets.erase(it);
-        } else {
-          if ((uint64_t) (recvlen + getEventQueueByteSize()) <= 
static_cast<uint64_t>(_recvBufSize)) {
-            uint8_t *payload = new uint8_t[recvlen];
-            memcpy(payload, _buffer, recvlen);
-            putEvent(payload, recvlen);
-          }
-          ++it;
-        }
-      }
-    }
+  if (!context->getProperty(MaxBatchSize.getName(), max_batch_size_) || 
max_batch_size_ <= 0) {
+    max_batch_size_ = MaxBatchSize.getDefaultValue();
+    logger_->log_debug("Missing or invalid attribute %s", 
MaxBatchSize.getName());
   }
-  return;
-}
 
-int ListenSyslog::readline(int fd, char *bufptr, size_t len) {
-  char *bufx = bufptr;
-  static char *bp;
-  static int cnt = 0;
-  static char b[2048];
-  char c;
-
-  while (--len > 0) {
-    if (--cnt <= 0) {
-      cnt = recv(fd, b, sizeof(b), 0);
-      if (cnt < 0) {
-        if (errno == EINTR) {
-          len++; /* the while will decrement */
-          continue;
-        }
-        return -1;
-      }
-      if (cnt == 0)
-        return 0;
-      bp = b;
-    }
-    c = *bp++;
-    *bufptr++ = c;
-    if (c == '\n') {
-      *bufptr = '\n';
-      return bufptr - bufx + 1;
-    }
+  if (!context->getProperty(ParseMessages.getName(), parse_messages_)) {
+    parse_messages_ = ParseMessages.getDefaultValue();

Review Comment:
   Didnt know that, but you are right.
   Changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1294/commits/027fea1d65bd423940edd81d598cef68946e2202#diff-a22ae2c37869a184bd936ac719f98ae8bbdb1b8d2ad5b3e6fc48ea9207fe530bR88-R93



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to