[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-14 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504475684



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -191,7 +206,11 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, 
core::ProcessSessionF
   }
 
   server_.reset(new CivetServer(options, &callbacks_, &logger_));
-  handler_.reset(new Handler(basePath, context, sessionFactory, 
std::move(authDNPattern), std::move(headersAsAttributesPattern)));
+
+  context->getProperty(BatchSize.getName(), batch_size_);
+  logger_->log_debug("ListenHTTP using %s: %d", BatchSize.getName(), 
batch_size_);

Review comment:
   Fixed in 
[7813ee3](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/7813ee3aa9fc16db0b132d49f27d88bf4671df3f)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-14 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504475596



##
File path: extensions/civetweb/processors/ListenHTTP.h
##
@@ -205,13 +193,17 @@ class ListenHTTP : public core::Processor {
   void notifyStop() override;
 
  private:
-  // Logger
-  std::shared_ptr logger_;
+  static const std::size_t DEFAULT_BUFFER_SIZE;
 
+  void processIncomingFlowFile(core::ProcessSession *session);
+  void processRequestBuffer(core::ProcessSession *session);
+
+  std::shared_ptr logger_;
   CivetCallbacks callbacks_;
   std::unique_ptr server_;
   std::unique_ptr handler_;
   std::string listeningPort;
+  std::size_t batch_size_;

Review comment:
   Fixed in 
[7813ee3](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/7813ee3aa9fc16db0b132d49f27d88bf4671df3f)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504056045



##
File path: extensions/civetweb/processors/ListenHTTP.h
##
@@ -155,18 +160,15 @@ class ListenHTTP : public core::Processor {
   // Write callback for transferring data from HTTP request to content repo
   class WriteCallback : public OutputStreamCallback {
public:
-WriteCallback(struct mg_connection *conn, const struct mg_request_info 
*reqInfo);
+WriteCallback(std::unique_ptr);
 int64_t process(std::shared_ptr stream);
 
private:
-// Logger
 std::shared_ptr logger_;
-
-struct mg_connection *conn_;
-const struct mg_request_info *req_info_;
+std::shared_ptr request_content_;

Review comment:
   Replaced in 
[385be9e](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/385be9e942f776f41fe87ab2526fbe016b8d1d51)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504052537



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const 
mg_request_info *req_info,
   }
 }
 
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const 
mg_request_info *req_info, std::unique_ptr content_buffer) {
+  auto flow_file = std::make_shared();
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
+  }
+
+  if (!flow_file) {

Review comment:
   Previously session->create() was used to create the FlowFileRecord and 
now we allocate the record in this scope, so it shouldn't be needed to check 
the flow_file at all. I removed the check in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)

##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const 
mg_request_info *req_info,
   }
 }
 
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const 
mg_request_info *req_info, std::unique_ptr content_buffer) {
+  auto flow_file = std::make_shared();
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
+  }
+
+  if (!flow_file) {
+sendHttp500(conn);

Review comment:
   Removed in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)
 due to the comment above.

##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const 
mg_request_info *req_info,
   }
 }
 
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const 
mg_request_info *req_info, std::unique_ptr content_buffer) {
+  auto flow_file = std::make_shared();
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
+  }
+
+  if (!flow_file) {
+sendHttp500(conn);
+return true;

Review comment:
   Fixed in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504052806



##
File path: extensions/civetweb/processors/ListenHTTP.h
##
@@ -112,20 +115,22 @@ class ListenHTTP : public core::Processor {
   }
 }
 
+std::size_t buffer_size_;
+utils::ConcurrentQueue request_buffer;

Review comment:
   I suppose it shouldn't be public as the consumer only should be allowed 
to dequeue the buffer. Fixed it in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)

##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const 
mg_request_info *req_info,
   }
 }
 
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const 
mg_request_info *req_info, std::unique_ptr content_buffer) {
+  auto flow_file = std::make_shared();
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
+  }
+
+  if (!flow_file) {
+sendHttp500(conn);
+return true;
+  }
+
+  setHeaderAttributes(req_info, flow_file);
+
+  if (buffer_size_ == 0 || request_buffer.size() < buffer_size_) {
+request_buffer.enqueue(std::make_pair(std::move(flow_file), 
std::move(content_buffer)));
+  } else {
+logger_->log_warn("ListenHTTP buffer is full");

Review comment:
   Added the request method and uri to the log, I could not find anything 
more specific to the message in the request info. It is already good to have 
the information that the message was dropped. Fixed in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)

##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -456,19 +473,28 @@ int64_t 
ListenHTTP::WriteCallback::process(std::shared_ptr strea
 }
 
 // Read a buffer of data from client
-rlen = mg_read(conn_, &buf[0], (size_t) rlen);
+rlen = mg_read(conn, &buf[0], (size_t) rlen);
 
 if (rlen <= 0) {
   break;
 }
 
 // Transfer buffer data to the output stream
-stream->write(&buf[0], gsl::narrow(rlen));
+content_buffer->write(&buf[0], gsl::narrow(rlen));
 
 nlen += rlen;
   }
 
-  return nlen;
+  return content_buffer;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr 
request_content)
+: logger_(logging::LoggerFactory::getLogger())

Review comment:
   Fixed in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504053097



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -456,19 +473,28 @@ int64_t 
ListenHTTP::WriteCallback::process(std::shared_ptr strea
 }
 
 // Read a buffer of data from client
-rlen = mg_read(conn_, &buf[0], (size_t) rlen);
+rlen = mg_read(conn, &buf[0], (size_t) rlen);
 
 if (rlen <= 0) {
   break;
 }
 
 // Transfer buffer data to the output stream
-stream->write(&buf[0], gsl::narrow(rlen));
+content_buffer->write(&buf[0], gsl::narrow(rlen));
 
 nlen += rlen;
   }
 
-  return nlen;
+  return content_buffer;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr 
request_content)
+: logger_(logging::LoggerFactory::getLogger())
+, request_content_(std::move(request_content)) {
+}
+
+int64_t ListenHTTP::WriteCallback::process(std::shared_ptr 
stream) {

Review comment:
   As I checked it was an overriden virtual method so the parameter type is 
fixed, but I added the `override` keyword to be more explicit in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504052241



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext 
*context, core::ProcessSessionF
 ListenHTTP::~ListenHTTP() = default;
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  std::shared_ptr flow_file = session->get();
+  logger_->log_debug("OnTrigger ListenHTTP");
+  processIncomingFlowFile(session);
+  processRequestBuffer(session);
+}
 
-  // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+  std::shared_ptr flow_file = session->get();
   if (!flow_file) {
 return;
   }
 
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body") {
-
-if (handler_) {
-  struct response_body response { "", "", "" };
-  ResponseBodyReadCallback cb(&response.body);
-  flow_file->getAttribute("filename", response.uri);
-  flow_file->getAttribute("mime.type", response.mime_type);
-  if (response.mime_type.empty()) {
-logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
-response.mime_type = "application/octet-stream";
-  }
-  session->read(flow_file, &cb);
-  handler_->set_response_body(std::move(response));
+  if (type == "response_body" && handler_) {
+response_body response;
+ResponseBodyReadCallback cb(&response.body);
+flow_file->getAttribute("filename", response.uri);
+flow_file->getAttribute("mime.type", response.mime_type);
+if (response.mime_type.empty()) {
+  logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
+  response.mime_type = "application/octet-stream";
 }
+session->read(flow_file, &cb);
+handler_->setResponseBody(std::move(response));
   }
 
   session->remove(flow_file);
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext 
*context, core::ProcessSessionFactory *session_factory, std::string 
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+  std::size_t flow_file_count = 0;
+  for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) 
{
+FlowFileBufferPair flow_file_buffer_pair;
+if (!handler_->request_buffer.tryDequeue(flow_file_buffer_pair)) {
+  break;
+}
+
+auto flow_file = flow_file_buffer_pair.first;
+session->add(flow_file);
+
+if (flow_file_buffer_pair.second) {
+  WriteCallback callback(std::move(flow_file_buffer_pair.second));
+  session->write(flow_file, &callback);
+}
+
+session->transfer(flow_file, Success);
+  }
+
+  logger_->log_debug("ListenHTTP transferred %d flow files from HTTP request 
buffer", flow_file_count);

Review comment:
   Fixed in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)

##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext 
*context, core::ProcessSessionF
 ListenHTTP::~ListenHTTP() = default;
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  std::shared_ptr flow_file = session->get();
+  logger_->log_debug("OnTrigger ListenHTTP");
+  processIncomingFlowFile(session);
+  processRequestBuffer(session);
+}
 
-  // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+  std::shared_ptr flow_file = session->get();
   if (!flow_file) {
 return;
   }
 
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body") {
-
-if (handler_) {
-  struct response_body response { "", "", "" };
-  ResponseBodyReadCallback cb(&response.body);
-  flow_file->getAttribute("filename", response.uri);
-  flow_file->getAttribute("mime.type", response.mime_type);
-  if (response.mime_type.empty()) {
-logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
-response.mime_type = "application/octet-stream";
-  }
-  session->read(flow_file, &cb);
-  handler_->set_response_body(std::move(response));
+  if (type == "response_body" && handler_) {
+response_body response;
+ResponseBodyReadCallback cb(&response.body);
+flow_file->getAttribute("filename", response.uri);
+flow_file->getAttribute("mime.type", response.mime_type);
+if (response.mime_type.empty()) {
+  logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
+  respo

[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504052023



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -191,7 +206,15 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, 
core::ProcessSessionF
   }
 
   server_.reset(new CivetServer(options, &callbacks_, &logger_));
-  handler_.reset(new Handler(basePath, context, sessionFactory, 
std::move(authDNPattern), std::move(headersAsAttributesPattern)));
+
+  context->getProperty(BatchSize.getName(), batch_size_);
+  logger_->log_debug("ListenHTTP using %s: %d", BatchSize.getName(), 
batch_size_);
+
+  std::size_t buffer_size;
+  context->getProperty(BufferSize.getName(), buffer_size);
+  logger_->log_debug("ListenHTTP using %s: %d", BufferSize.getName(), 
buffer_size);
+
+  handler_.reset(new Handler(basePath, context, std::move(authDNPattern), 
std::move(headersAsAttributesPattern), buffer_size));

Review comment:
   Good point, fixed in 
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)

##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext 
*context, core::ProcessSessionF
 ListenHTTP::~ListenHTTP() = default;
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  std::shared_ptr flow_file = session->get();
+  logger_->log_debug("OnTrigger ListenHTTP");
+  processIncomingFlowFile(session);
+  processRequestBuffer(session);
+}
 
-  // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+  std::shared_ptr flow_file = session->get();
   if (!flow_file) {
 return;
   }
 
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body") {
-
-if (handler_) {
-  struct response_body response { "", "", "" };
-  ResponseBodyReadCallback cb(&response.body);
-  flow_file->getAttribute("filename", response.uri);
-  flow_file->getAttribute("mime.type", response.mime_type);
-  if (response.mime_type.empty()) {
-logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
-response.mime_type = "application/octet-stream";
-  }
-  session->read(flow_file, &cb);
-  handler_->set_response_body(std::move(response));
+  if (type == "response_body" && handler_) {
+response_body response;
+ResponseBodyReadCallback cb(&response.body);
+flow_file->getAttribute("filename", response.uri);
+flow_file->getAttribute("mime.type", response.mime_type);
+if (response.mime_type.empty()) {
+  logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
+  response.mime_type = "application/octet-stream";
 }
+session->read(flow_file, &cb);
+handler_->setResponseBody(std::move(response));
   }
 
   session->remove(flow_file);
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext 
*context, core::ProcessSessionFactory *session_factory, std::string 
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+  std::size_t flow_file_count = 0;
+  for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) 
{
+FlowFileBufferPair flow_file_buffer_pair;
+if (!handler_->request_buffer.tryDequeue(flow_file_buffer_pair)) {
+  break;

Review comment:
   This can happen only if the queue becomes empty before we reach batch 
size. It is a normal use case and we log the number of flow files we dequeued 
after the loop so I think it shouldn't be needed to additionally log here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503859521



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -368,7 +368,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, 
struct mg_connection *
   // Always send 100 Continue, as allowed per standard to minimize client 
delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
   mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
 
-  return enqueueRequest(conn, req_info, createContentBuffer(conn, req_info));
+  return enqueueRequest(conn, req_info, std::move(createContentBuffer(conn, 
req_info)));

Review comment:
   Fixed in 
[750d5d1](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/750d5d14c627818cacfc6292856bbf9e10f6ce30)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503799392



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -434,16 +456,11 @@ void ListenHTTP::Handler::write_body(mg_connection *conn, 
const mg_request_info
   }
 }
 
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const 
struct mg_request_info *reqInfo)
-: logger_(logging::LoggerFactory::getLogger()) {
-  conn_ = conn;
-  req_info_ = reqInfo;
-}
-
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr 
stream) {
+std::shared_ptr 
ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const 
struct mg_request_info *req_info) {
+  auto content_buffer = std::make_shared();

Review comment:
   Fixed in 
[edf5b90](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/edf5b9095ca7eeea4a1b2db4ba0ba58bec6632d0)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503799217



##
File path: extensions/civetweb/processors/ListenHTTP.h
##
@@ -43,15 +43,17 @@ namespace processors {
 class ListenHTTP : public core::Processor {
  public:
 
+  using FlowFileBufferPair=std::pair, 
std::shared_ptr>;

Review comment:
   Fixed in 
[edf5b90](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/edf5b9095ca7eeea4a1b2db4ba0ba58bec6632d0)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503799050



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const 
mg_request_info *req_info,
   }
 }
 
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const 
mg_request_info *req_info, std::shared_ptr content_buffer) {
+  auto flow_file = std::make_shared();
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
+  }
+
+  if (!flow_file) {
+sendHttp500(conn);
+return true;
+  }
+
+  setHeaderAttributes(req_info, flow_file);
+
+  if (buffer_size_ == 0 || request_buffer.size() < buffer_size_) {
+request_buffer.enqueue(std::make_pair(std::move(flow_file), 
std::move(content_buffer)));
+  } else {
+logger_->log_error("ListenHTTP buffer is full");

Review comment:
   Fixed in 
[edf5b90](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/edf5b9095ca7eeea4a1b2db4ba0ba58bec6632d0)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503798865



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -212,51 +233,82 @@ void ListenHTTP::onSchedule(core::ProcessContext 
*context, core::ProcessSessionF
 ListenHTTP::~ListenHTTP() = default;
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  std::shared_ptr flow_file = session->get();
+  logger_->log_debug("OnTrigger ListenHTTP");
+  processIncomingFlowFile(session);
+  processRequestBuffer(session);
+}
 
-  // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+  std::shared_ptr flow_file = session->get();
   if (!flow_file) {
 return;
   }
 
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body") {
-
-if (handler_) {
-  struct response_body response { "", "", "" };
-  ResponseBodyReadCallback cb(&response.body);
-  flow_file->getAttribute("filename", response.uri);
-  flow_file->getAttribute("mime.type", response.mime_type);
-  if (response.mime_type.empty()) {
-logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
-response.mime_type = "application/octet-stream";
-  }
-  session->read(flow_file, &cb);
-  handler_->set_response_body(std::move(response));
+  if (type == "response_body" && handler_) {
+response_body response;
+ResponseBodyReadCallback cb(&response.body);
+flow_file->getAttribute("filename", response.uri);
+flow_file->getAttribute("mime.type", response.mime_type);
+if (response.mime_type.empty()) {
+  logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
+  response.mime_type = "application/octet-stream";
 }
+session->read(flow_file, &cb);
+handler_->setResponseBody(std::move(response));
   }
 
   session->remove(flow_file);
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext 
*context, core::ProcessSessionFactory *session_factory, std::string 
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+  std::size_t flow_file_count = 0;
+  while (batch_size_ == 0 || batch_size_ > flow_file_count) {

Review comment:
   I kept the initialization in its original scope as it is logged outside 
the loop, but I moved the increment to the for loop in 
[edf5b90](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/edf5b9095ca7eeea4a1b2db4ba0ba58bec6632d0)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #921: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP processor

2020-10-13 Thread GitBox


lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503798155



##
File path: extensions/civetweb/processors/ListenHTTP.cpp
##
@@ -62,6 +62,17 @@ core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP 
Headers to receive as
 " should be passed along 
as FlowFile attributes",
 "");
 
+core::Property ListenHTTP::BatchSize(
+core::PropertyBuilder::createProperty("Batch Size")
+->withDescription("Maximum number of buffered requests to be processed 
in a single batch. If set to zero all buffered requests are processed.")
+->withDefaultValue(0)->build());
+
+core::Property ListenHTTP::BufferSize(
+core::PropertyBuilder::createProperty("Buffer Size")
+->withDescription("Maximum number of HTTP Requests allowed to be 
buffered before processing them when the processor is triggered. "
+  "If the buffer full, the request is refused. If set 
to zero the buffer is unlimited.")
+->withDefaultValue(0)->build());
+

Review comment:
   That seems reasonable, thanks for the analysis. I changed the default to 
20k in 
[edf5b90](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/edf5b9095ca7eeea4a1b2db4ba0ba58bec6632d0)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org