fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439385828



##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr<core::ProcessContext>& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& 
context, TailState &file, const std::string &base_file_name) {
-  struct stat statbuf;
-  std::vector<TailMatchedFileItem> matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), &statbuf) == 0) {
-    logger_->log_trace("Searching for files rolled over");
-    std::string pattern = file.current_file_name_;
-    std::size_t found = file.current_file_name_.find_last_of(".");
-    if (found != std::string::npos)
-      pattern = file.current_file_name_.substr(0, found);
-
-    // Callback, called for each file entry in the listed directory
-    // Return value is used to break (false) or continue (true) listing
-    auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-      struct stat sb;
-      std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-      if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), &sb) == 0) {
-        uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-        if (candidateModTime >= file.currentTailFileModificationTime_) {
-          logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-          ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-          if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-              sb.st_size == file.currentTailFilePosition_) {
-            return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector<TailState> TailFile::findRotatedFiles(const TailState &state) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+      
std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector<TailStateWithMtime> matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string &path, const std::string 
&file_name) -> bool {
+    if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+      std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+      TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+      logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+      if (mtime >= 
std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) {
+        logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+        matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+      }
+    }
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime &left, const TailStateWithMtime &right) {
+    return std::tie(left.mtime_, left.tail_state_.file_name_) <
+           std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+    TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_;
+    std::string full_file_name = first_rotated_file.fileNameWithPath();
+    if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+      uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+      if (checksum == state.checksum_) {
+        first_rotated_file.position_ = state.position_;
+        first_rotated_file.checksum_ = state.checksum_;
       }
-      TailMatchedFileItem item;
-      item.fileName = filename;
-      item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-      matchedFiles.push_back(item);
     }
   }
-  return true;};
-
-    utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-    if (matchedFiles.size() < 1) {
-      logger_->log_debug("No newer files found in directory!");
-      return;
-    }
+  std::vector<TailState> matched_files;
+  std::transform(matched_files_with_mtime.begin(), 
matched_files_with_mtime.end(), std::back_inserter(matched_files),
+                 [](TailStateWithMtime &tail_state_with_mtime) { return 
std::move(tail_state_with_mtime.tail_state_); });
+  return matched_files;
+}
 
-    // Sort the list based on modified time
-    std::sort(matchedFiles.begin(), matchedFiles.end(), 
sortTailMatchedFileItem);
-    TailMatchedFileItem item = matchedFiles[0];
-    logger_->log_info("TailFile File Roll Over from %s to %s", 
file.current_file_name_, item.fileName);
+void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
+  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
 
-    // Going ahead in the file rolled over
-    if (file.current_file_name_ != base_file_name) {
-      logger_->log_debug("Resetting posotion since %s != %s", base_file_name, 
file.current_file_name_);
-      file.currentTailFilePosition_ = 0;
+  if (tail_mode_ == Mode::MULTIPLE) {
+    if (last_multifile_lookup_ + lookup_frequency_ < 
std::chrono::steady_clock::now()) {
+      logger_->log_debug("Lookup frequency %d ms have elapsed, doing new 
multifile lookup", lookup_frequency_.count());
+      checkForRemovedFiles();
+      checkForNewFiles();
+      last_multifile_lookup_ = std::chrono::steady_clock::now();
+    } else {
+      logger_->log_trace("Skipping multifile lookup");
     }
+  }
 
-    file.current_file_name_ = item.fileName;
+  // iterate over file states. may modify them
+  for (auto &state : tail_states_) {
+    processFile(context, session, state.first, state.second);
+  }
 
-    storeState(context);
+  if (!session->existsFlowFileInRelationship(Success)) {
+    yield();
   }
 }
 
-void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
-  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
-  std::string st_file;
-  if (context->getProperty(StateFile.getName(), st_file)) {
-    state_file_ = st_file + "." + getUUIDStr();
+void TailFile::processFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                           const std::shared_ptr<core::ProcessSession> 
&session,
+                           const std::string &full_file_name,
+                           TailState &state) {
+  if (utils::file::FileUtils::file_size(full_file_name) < state.position_) {
+    processRotatedFiles(context, session, state);
   }
-  if (!this->state_recovered_) {
-    state_recovered_ = true;
-    // recover the state if we have not done so
-    this->recoverState(context);
+
+  processSingleFile(context, session, full_file_name, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessContext> 
&context,
+                                   const std::shared_ptr<core::ProcessSession> 
&session,
+                                   TailState &state) {
+    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
+    for (TailState &file_state : rotated_file_states) {
+      processSingleFile(context, session, file_state.fileNameWithPath(), 
file_state);
+    }
+    state.position_ = 0;
+    state.checksum_ = 0;
+}
+
+void TailFile::processSingleFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                                 const std::shared_ptr<core::ProcessSession> 
&session,
+                                 const std::string &full_file_name,
+                                 TailState &state) {
+  std::string fileName = state.file_name_;
+
+  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
+    logger_->log_warn("Unable to read file %s as it does not exist or has size 
zero", full_file_name);
+    return;
   }
+  logger_->log_debug("Tailing file %s from %llu", full_file_name, 
state.position_);
 
-  /**
-   * iterate over file states. may modify them
-   */
-  for (auto &state : tail_states_) {
-    auto fileLocation = state.second.path_;
-
-    checkRollOver(context, state.second, state.first);
-    std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    struct stat statbuf;
-
-    logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-    if (stat(fullPath.c_str(), &statbuf) == 0) {
-      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-        logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-        logger_->log_trace("%s", "there are no new input for the current tail 
file");
-        context->yield();
-        return;
-      }
-      std::size_t found = state.first.find_last_of(".");
-      std::string baseName = state.first.substr(0, found);
-      std::string extension = state.first.substr(found + 1);
-
-      if (!delimiter_.empty()) {
-        char delim = delimiter_.c_str()[0];
-        if (delim == '\\') {
-          if (delimiter_.size() > 1) {
-            switch (delimiter_.c_str()[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                // previous behavior
-                break;
-            }
-          }
-        }
-        logger_->log_debug("Looking for delimiter 0x%X", delim);
-        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-        logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-        for (auto ffr : flowFiles) {
-          logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
-          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState(context);
-        }
+  std::size_t last_dot_position = fileName.find_last_of('.');
+  std::string baseName = fileName.substr(0, last_dot_position);
+  std::string extension = fileName.substr(last_dot_position + 1);
+
+  if (!delimiter_.empty()) {
+    char delim = delimiter_[0];
+    logger_->log_trace("Looking for delimiter 0x%X", delim);
+
+    std::size_t num_flow_files = 0;
+    FileReaderCallback file_reader{full_file_name, state.position_, delim, 
state.checksum_};
+    TailState state_copy{state};
+
+    while (file_reader.hasMoreToRead()) {
+      auto flow_file = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+      session->write(flow_file, &file_reader);
+
+      if (file_reader.useLatestFlowFile()) {
+        updateFlowFileAttributes(full_file_name, state_copy, fileName, 
baseName, extension, flow_file);
+        session->transfer(flow_file, Success);
+        updateStateAttributes(state_copy, flow_file->getSize(), 
file_reader.checksum());
+
+        ++num_flow_files;
 
       } else {
-        std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (flowFile) {
-          flowFile->updateKeyedAttribute(PATH, fileLocation);
-          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-          session->transfer(flowFile, Success);
-          logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-              + extension;
-          flowFile->updateKeyedAttribute(FILENAME, logName);
-          state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState(context);
-        }
+        session->remove(flow_file);
       }
-      state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-    } else {
-      logger_->log_warn("Unable to stat file %s", fullPath);
     }
+
+    state = state_copy;
+    storeState(context);
+
+    logger_->log_info("%u flowfiles were received from TailFile input", 
num_flow_files);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to