arpadboda commented on a change in pull request #1064:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1064#discussion_r630913410



##########
File path: extensions/standard-processors/processors/GetFile.cpp
##########
@@ -158,76 +147,89 @@ void GetFile::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFact
   request_.inputDirectory = value;
 }
 
-void GetFile::onTrigger(core::ProcessContext* /*context*/, 
core::ProcessSession *session) {
-  // Perform directory list
-
+void GetFile::onTrigger(core::ProcessContext* /*context*/, 
core::ProcessSession* session) {
   metrics_->iterations_++;
 
-  const bool isDirEmptyBeforePoll = isListingEmpty();
-  logger_->log_debug("Is listing empty before polling directory %i", 
isDirEmptyBeforePoll);
-  if (isDirEmptyBeforePoll) {
+  const bool is_dir_empty_before_poll = isListingEmpty();
+  logger_->log_debug("Listing is %s before polling directory", 
is_dir_empty_before_poll ? "empty" : "not empty");
+  if (is_dir_empty_before_poll) {
     if (request_.pollInterval == 0 || (utils::timeutils::getTimeMillis() - 
last_listing_time_) > request_.pollInterval) {
       performListing(request_);
       last_listing_time_.store(utils::timeutils::getTimeMillis());
     }
   }
 
-  const bool isDirEmptyAfterPoll = isListingEmpty();
-  logger_->log_debug("Is listing empty after polling directory %i", 
isDirEmptyAfterPoll);
-
-  if (!isDirEmptyAfterPoll) {
-    try {
-      std::queue<std::string> list;
-      pollListing(list, request_);
-      while (!list.empty()) {
-        std::string fileName = list.front();
-        list.pop();
-        logger_->log_info("GetFile process %s", fileName);
-        auto flowFile = session->create();
-        if (flowFile == nullptr)
-          return;
-        std::size_t found = fileName.find_last_of("/\\");
-        std::string path = fileName.substr(0, found);
-        std::string name = fileName.substr(found + 1);
-        flowFile->setAttribute(core::SpecialFlowAttribute::FILENAME, name);
-        flowFile->setAttribute(core::SpecialFlowAttribute::PATH, path);
-        flowFile->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, 
fileName);
-        session->import(fileName, flowFile, request_.keepSourceFile);
-        session->transfer(flowFile, Success);
+  const bool is_dir_empty_after_poll = isListingEmpty();
+  logger_->log_debug("Listing is %s after polling directory", 
is_dir_empty_after_poll ? "empty" : "not empty");
+  if (is_dir_empty_after_poll) {
+    yield();
+    return;
+  }
+
+  std::queue<std::string> list_of_file_names = pollListing(request_.batchSize);
+  while (!list_of_file_names.empty()) {
+    std::string file_name = list_of_file_names.front();
+    list_of_file_names.pop();
+    getSingleFile(*session, file_name);
+  }
+}
+
+void GetFile::getSingleFile(core::ProcessSession& session, const std::string& 
file_name) const {
+  logger_->log_info("GetFile process %s", file_name);
+  auto flow_file = session.create();
+  gsl_Expects(flow_file);
+  std::string path;
+  std::string name;
+  std::tie(path, name) = utils::file::split_path(file_name);
+  flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, name);
+  flow_file->setAttribute(core::SpecialFlowAttribute::PATH, path);
+  flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, 
file_name);
+
+  try {
+    utils::FileReaderCallback file_reader_callback{file_name};
+    session.write(flow_file, &file_reader_callback);
+    session.transfer(flow_file, Success);
+    if (!request_.keepSourceFile) {
+      auto remove_status = remove(file_name.c_str());
+      if (remove_status != 0) {
+        logger_->log_error("GetFile could not delete file '%s', error %d: %s", 
file_name, errno, strerror(errno));
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("GetFile Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      throw;
     }
+  } catch (const std::exception& exception) {
+    logger_->log_error("GetFile caught exception while processing file '%s': 
%s", file_name, exception.what());
+    flow_file->setDeleted(true);

Review comment:
       Okay, I need more coffee, didn't realise that this is a different 
scenario. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to