fgerlits commented on a change in pull request #1064:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1064#discussion_r630911063



##########
File path: extensions/standard-processors/processors/GetFile.cpp
##########
@@ -158,76 +147,89 @@ void GetFile::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFact
   request_.inputDirectory = value;
 }
 
-void GetFile::onTrigger(core::ProcessContext* /*context*/, 
core::ProcessSession *session) {
-  // Perform directory list
-
+void GetFile::onTrigger(core::ProcessContext* /*context*/, 
core::ProcessSession* session) {
   metrics_->iterations_++;
 
-  const bool isDirEmptyBeforePoll = isListingEmpty();
-  logger_->log_debug("Is listing empty before polling directory %i", 
isDirEmptyBeforePoll);
-  if (isDirEmptyBeforePoll) {
+  const bool is_dir_empty_before_poll = isListingEmpty();
+  logger_->log_debug("Listing is %s before polling directory", 
is_dir_empty_before_poll ? "empty" : "not empty");
+  if (is_dir_empty_before_poll) {
     if (request_.pollInterval == 0 || (utils::timeutils::getTimeMillis() - 
last_listing_time_) > request_.pollInterval) {
       performListing(request_);
       last_listing_time_.store(utils::timeutils::getTimeMillis());
     }
   }
 
-  const bool isDirEmptyAfterPoll = isListingEmpty();
-  logger_->log_debug("Is listing empty after polling directory %i", 
isDirEmptyAfterPoll);
-
-  if (!isDirEmptyAfterPoll) {
-    try {
-      std::queue<std::string> list;
-      pollListing(list, request_);
-      while (!list.empty()) {
-        std::string fileName = list.front();
-        list.pop();
-        logger_->log_info("GetFile process %s", fileName);
-        auto flowFile = session->create();
-        if (flowFile == nullptr)
-          return;
-        std::size_t found = fileName.find_last_of("/\\");
-        std::string path = fileName.substr(0, found);
-        std::string name = fileName.substr(found + 1);
-        flowFile->setAttribute(core::SpecialFlowAttribute::FILENAME, name);
-        flowFile->setAttribute(core::SpecialFlowAttribute::PATH, path);
-        flowFile->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, 
fileName);
-        session->import(fileName, flowFile, request_.keepSourceFile);
-        session->transfer(flowFile, Success);
+  const bool is_dir_empty_after_poll = isListingEmpty();
+  logger_->log_debug("Listing is %s after polling directory", 
is_dir_empty_after_poll ? "empty" : "not empty");
+  if (is_dir_empty_after_poll) {
+    yield();
+    return;
+  }
+
+  std::queue<std::string> list_of_file_names = pollListing(request_.batchSize);
+  while (!list_of_file_names.empty()) {
+    std::string file_name = list_of_file_names.front();
+    list_of_file_names.pop();
+    getSingleFile(*session, file_name);
+  }
+}
+
+void GetFile::getSingleFile(core::ProcessSession& session, const std::string& 
file_name) const {
+  logger_->log_info("GetFile process %s", file_name);
+  auto flow_file = session.create();
+  gsl_Expects(flow_file);
+  std::string path;
+  std::string name;
+  std::tie(path, name) = utils::file::split_path(file_name);
+  flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, name);
+  flow_file->setAttribute(core::SpecialFlowAttribute::PATH, path);
+  flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, 
file_name);
+
+  try {
+    utils::FileReaderCallback file_reader_callback{file_name};
+    session.write(flow_file, &file_reader_callback);
+    session.transfer(flow_file, Success);
+    if (!request_.keepSourceFile) {
+      auto remove_status = remove(file_name.c_str());
+      if (remove_status != 0) {
+        logger_->log_error("GetFile could not delete file '%s', error %d: %s", 
file_name, errno, strerror(errno));
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("GetFile Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      throw;
     }
+  } catch (const std::exception& exception) {
+    logger_->log_error("GetFile caught exception while processing file '%s': 
%s", file_name, exception.what());
+    flow_file->setDeleted(true);

Review comment:
       We don't want to roll back if reading one file failed but another file 
succeeded; in this case, we want to ignore the failed file but process the 
successful one.
   
   My understanding is that every flow file we create with `session->create()` 
has to either be transferred to an output relationship or marked as deleted, 
otherwise `routeFlowFile(flow_file)`, and therefore `session->commit()`, will 
throw.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to