bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386412146
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const 
std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = 
std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", 
(uint64_t)timeDiff());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      logger_->log_debug("outgoingConnectionsFull");
+      return false;
+    }
+
+    return true;
+  };
+
+  size_t eventCount = 0;
+  const TimeDiff timeDiff;
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %zu Events in %"  PRIu64 " ms", eventCount, 
(uint64_t)timeDiff());
+  });
 
-  const auto now = GetTickCount64();
+  size_t commitAndSaveBookmarkCount = 0;
+  std::wstring bookmarkXml;
 
-  if (flowFileCount > 0) {
-    lastActivityTimestamp_ = now;
+  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), 
wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!hEventResults) {
+    LOG_LAST_ERROR(EvtQuery);
+    return;
+  }
+  const utils::ScopeGuard guard_hEventResults([hEventResults]() { 
EvtClose(hEventResults); });
+
+  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+  if (!hBookmark) {
+    // Unrecovarable error.
+    pBookmark_.reset();
+    return;
+  }
+
+  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+    LOG_LAST_ERROR(EvtSeek);
+    return;
   }
-  else if (inactiveDurationToReconnect_ > 0) {
-    if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
-      logger_->log_info("Exceeds configured 'inactive duration to reconnect' 
%lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
-      unsubscribe();
+
+  // Enumerate the events in the result set after the bookmarked event.
+  while (true) {
+    EVT_HANDLE hEvent{};
+    DWORD dwReturned{};
+    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LOG_LAST_ERROR(EvtNext);
+      }
+      break;
+    }
+    const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+    EventRender eventRender;
+    std::wstring newBookmarkXml;
+    if (createEventRender(hEvent, eventRender) && 
pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+      bookmarkXml = std::move(newBookmarkXml);
+      eventCount++;
+      putEventRenderFlowFileToSession(eventRender, *session);
+
+      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+        if (!commitAndSaveBookmark(bookmarkXml)) {
+          return;
+        }
+
+        commitAndSaveBookmarkCount = eventCount;
+      }
     }
   }
+
+  if (eventCount > commitAndSaveBookmarkCount) {
+    commitAndSaveBookmark(bookmarkXml);
 
 Review comment:
   @szaszm If `batch_commit_size_` is not 0, we must commit once every batch 
size, that's the point of the feature, and it is an important performance 
consideration (the uncommitted flow files would otherwise just take up more and 
more memory and the flow would be blocked, as further processors can't work on 
uncommitted flow files).
   This particular code is to make sure we safely commit the rest of the flow 
files (or all of the flow files, if `batch_commit_size_ == 0U`) before saving 
the bookmark. This is required, because committing can throw an exception, 
causing us to roll back, but since we would already have saved the bookmark, we 
would skip these events, causing data loss.
   An another commit will automatically occur when we return from onTrigger, 
but since there will be no flow files to commit, that will not have much 
performance impact.
   
   We can change to framework in the future to make it possible to disable 
autocommit, but until then the tradeoff between having an empty commit and not 
losing data necessitates this.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to