bakaid commented on a change in pull request #741: MINIFICPP-1139 Implemented.
URL: https://github.com/apache/nifi-minifi-cpp/pull/741#discussion_r386412146
##########
File path: extensions/windows-event-log/ConsumeWindowsEventLog.cpp
##########
@@ -271,19 +280,88 @@ void ConsumeWindowsEventLog::onTrigger(const
std::shared_ptr<core::ProcessContex
return;
}
- const auto flowFileCount = processQueue(session);
+ struct TimeDiff {
+ auto operator()() const {
+ return
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
- time_).count();
+ }
+ const decltype(std::chrono::steady_clock::now()) time_ =
std::chrono::steady_clock::now();
+ };
+
+ const auto commitAndSaveBookmark = [&] (const std::wstring& bookmarkXml) {
+ const TimeDiff timeDiff;
+ session->commit();
+ logger_->log_debug("processQueue commit took %" PRIu64 " ms",
(uint64_t)timeDiff());
+
+ pBookmark_->saveBookmarkXml(bookmarkXml);
+
+ if (session->outgoingConnectionsFull("success")) {
+ logger_->log_debug("outgoingConnectionsFull");
+ return false;
+ }
+
+ return true;
+ };
+
+ size_t eventCount = 0;
+ const TimeDiff timeDiff;
+ utils::ScopeGuard timeGuard([&]() {
+ logger_->log_debug("processed %zu Events in %" PRIu64 " ms", eventCount,
(uint64_t)timeDiff());
+ });
- const auto now = GetTickCount64();
+ size_t commitAndSaveBookmarkCount = 0;
+ std::wstring bookmarkXml;
- if (flowFileCount > 0) {
- lastActivityTimestamp_ = now;
+ const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(),
wstrQuery_.c_str(), EvtQueryChannelPath);
+ if (!hEventResults) {
+ LOG_LAST_ERROR(EvtQuery);
+ return;
+ }
+ const utils::ScopeGuard guard_hEventResults([hEventResults]() {
EvtClose(hEventResults); });
+
+ auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
+ if (!hBookmark) {
+ // Unrecovarable error.
+ pBookmark_.reset();
+ return;
+ }
+
+ if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
+ LOG_LAST_ERROR(EvtSeek);
+ return;
}
- else if (inactiveDurationToReconnect_ > 0) {
- if ((now - lastActivityTimestamp_) > inactiveDurationToReconnect_) {
- logger_->log_info("Exceeds configured 'inactive duration to reconnect'
%lld ms. Unsubscribe to reconnect..", inactiveDurationToReconnect_);
- unsubscribe();
+
+ // Enumerate the events in the result set after the bookmarked event.
+ while (true) {
+ EVT_HANDLE hEvent{};
+ DWORD dwReturned{};
+ if (!EvtNext(hEventResults, 1, &hEvent, INFINITE, 0, &dwReturned)) {
+ if (ERROR_NO_MORE_ITEMS != GetLastError()) {
+ LOG_LAST_ERROR(EvtNext);
+ }
+ break;
+ }
+ const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });
+
+ EventRender eventRender;
+ std::wstring newBookmarkXml;
+ if (createEventRender(hEvent, eventRender) &&
pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
+ bookmarkXml = std::move(newBookmarkXml);
+ eventCount++;
+ putEventRenderFlowFileToSession(eventRender, *session);
+
+ if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
+ if (!commitAndSaveBookmark(bookmarkXml)) {
+ return;
+ }
+
+ commitAndSaveBookmarkCount = eventCount;
+ }
}
}
+
+ if (eventCount > commitAndSaveBookmarkCount) {
+ commitAndSaveBookmark(bookmarkXml);
Review comment:
@szaszm If `batch_commit_size_` is not 0, we must commit once every batch
size, that's the point of the feature, and it is an important performance
consideration (the uncommitted flow files would otherwise just take up more and
more memory and the flow would be blocked, as further processors can't work on
uncommitted flow files).
This particular code is to make sure we safely commit the rest of the flow
files (or all of the flow files, if `batch_commit_size_ == 0U`) before saving
the bookmark. This is required, because committing can throw an exception,
causing us to roll back, but since we would already have saved the bookmark, we
would skip these events, causing data loss.
An another commit will automatically occur when we return from onTrigger,
but since there will be no flow files to commit, that will not have much
performance impact.
We can change to framework in the future to make it possible to disable
autocommit, but until then the tradeoff between having an empty commit and not
losing data necessitates this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services