szaszm commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386386213
 
 

 ##########
 File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
 ##########
 @@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const 
std::shared_ptr<core::ProcessContex
     return;
   }
 
-  const auto flowFileCount = processQueue(session);
+  struct TimeDiff {
+    auto operator()() const {
+      return 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - time_).count();
+    }
+    const decltype(std::chrono::steady_clock::now()) time_ = 
std::chrono::steady_clock::now();
+  };
+
+  const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+    const TimeDiff timeDiff;
+    session->commit();
+    logger_->log_debug("processQueue commit took %" PRIu64 " ms", 
(uint64_t)timeDiff());
+
+    pBookmark_->saveBookmarkXml(bookmarkXml);
+
+    if (session->outgoingConnectionsFull("success")) {
+      logger_->log_debug("outgoingConnectionsFull");
+      return false;
+    }
+
+    return true;
+  };
+
+  size_t eventCount = 0;
+  const TimeDiff timeDiff;
+  utils::ScopeGuard timeGuard([&]() {
+    logger_->log_debug("processed %zu Events in %"  PRIu64 " ms", eventCount, 
(uint64_t)timeDiff());
+  });
 
-  const auto now = GetTickCount64();
+  size_t commitAndSaveBookmarkCount = 0;
+  std::wstring bookmarkXml;
 
-  if (flowFileCount > 0) {
-    lastActivityTimestamp_ = now;
+  const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), 
wstrQuery_.c_str(), EvtQueryChannelPath);
+  if (!hEventResults) {
+    LOG_LAST_ERROR(EvtQuery);
+    return;
+  }
+  const utils::ScopeGuard guard_hEventResults([hEventResults]() { 
EvtClose(hEventResults); });
+
+  auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+  if (!hBookmark) {
+    // Unrecovarable error.
+    pBookmark_.reset();
+    return;
+  }
+
+  if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+    LOG_LAST_ERROR(EvtSeek);
+    return;
   }
-  else if (inactiveDurationToReconnect_ > 0) {
-    if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
-      logger_->log_info("Exceeds configured 'inactive duration to reconnect' 
%lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
-      unsubscribe();
+
+  // Enumerate the events in the result set after the bookmarked event.
+  while (true) {
+    EVT_HANDLE hEvent{};
+    DWORD dwReturned{};
+    if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+      if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+        LOG_LAST_ERROR(EvtNext);
+      }
+      break;
+    }
+    const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+    EventRender eventRender;
+    std::wstring newBookmarkXml;
+    if (createEventRender(hEvent, eventRender) && 
pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+      bookmarkXml = std::move(newBookmarkXml);
+      eventCount++;
+      putEventRenderFlowFileToSession(eventRender, *session);
+
+      if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+        if (!commitAndSaveBookmark(bookmarkXml)) {
+          return;
+        }
+
+        commitAndSaveBookmarkCount = eventCount;
+      }
     }
   }
+
+  if (eventCount > commitAndSaveBookmarkCount) {
+    commitAndSaveBookmark(bookmarkXml);
 
 Review comment:
   Would it be possible to commit only once every `onTrigger()`, for 
performance reasons? I had a thread open about this but it was closed with a 
seemingly unrelated note.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to