Smalyshev has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/366752 )
Change subject: Fix TailPoller & sleep behavior ...................................................................... Fix TailPoller & sleep behavior - RC poller should not sleep if it's not at the end of stream of changes, even if current batch came out empty - TailPoller should avoid jumping in front of RC poller. Change-Id: I94becda5dfe4a14998f509a708cdeb30f6686ed9 --- M tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java M tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java M tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java M tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java 4 files changed, 82 insertions(+), 11 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf refs/changes/52/366752/1 diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java b/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java index fb6bba0..ee40982 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java @@ -265,11 +265,14 @@ log.warn("Retryable error fetching next batch. Retrying.", e); continue; } - if (batch.changes().isEmpty()) { - log.debug("Sleeping for {} secs", pollDelay); + if (batch.noChanges()) { + log.info("Sleeping for {} secs", pollDelay); Thread.sleep(pollDelay * 1000); continue; } + if (batch.changes().isEmpty()) { + continue; + } log.debug("{} changes in batch", batch.changes().size()); return batch; } diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java index f8ca2f1..f79d11a 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java @@ -144,6 +144,12 @@ List<Change> changes(); /** + * Whether this batch had no changes. + * @return had changes? + */ + boolean noChanges(); + + /** * The unit of advanced() in English. Used for logging. */ String advancedUnits(); @@ -201,6 +207,11 @@ } @Override + public boolean noChanges() { + return changes.isEmpty(); + } + + @Override public long advanced() { return advanced; } diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java index 140f993..d2fc42b 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java @@ -151,12 +151,16 @@ * @return */ private Batch checkTailPoller(Batch lastBatch) { - if (tailSeconds <= 0 || - lastBatch.leftOffDate().before(DateUtils.addSeconds(new Date(), -tailSeconds))) { - // still not caught up, do nothing + if (tailSeconds <= 0) { + // not enabled, we're done here return lastBatch; } + if (tailPoller == null) { + if (lastBatch.leftOffDate().before(DateUtils.addSeconds(new Date(), -tailSeconds))) { + // still not caught up, do nothing + return lastBatch; + } // We don't have poller yet - start it log.info("Started trailing poller with gap of {} seconds", tailSeconds); // Create new poller starting back tailSeconds and same IDs map. @@ -165,6 +169,7 @@ tailPoller = new TailingChangesPoller(poller, queue, tailSeconds); tailPoller.start(); } else { + tailPoller.setPollerTs(lastBatch.leftOffDate().getTime()); final Batch queuedBatch = queue.poll(); if (queuedBatch != null) { log.info("Merging {} changes from trailing queue", queuedBatch.changes().size()); @@ -189,6 +194,11 @@ private final JSONObject lastContinue; /** + * Flag that states we have had changes, even though we didn't return them. + */ + private boolean hasChanges; + + /** * A batch that will next continue using the continue parameter. */ private Batch(ImmutableList<Change> changes, long advanced, @@ -197,6 +207,23 @@ leftOffDate = nextStartTime; this.lastContinue = lastContinue; } + + /** + * Set changes status. + * @param changes Whether we really had changes or not. + */ + public void hasChanges(boolean changes) { + hasChanges = changes; + } + + @Override + public boolean noChanges() { + if (hasChanges) { + return false; + } + return super.noChanges(); + } + @Override public String advancedUnits() { @@ -277,7 +304,7 @@ * * @throws RetryableException on parse failure */ - @SuppressWarnings("checkstyle:cyclomaticcomplexity") + @SuppressWarnings({"checkstyle:npathcomplexity", "checkstyle:cyclomaticcomplexity"}) private Batch batch(Date lastNextStartTime, Batch lastBatch) throws RetryableException { try { JSONObject recentChanges = fetchRecentChanges(lastNextStartTime, lastBatch); @@ -335,12 +362,14 @@ changesByTitle.put(change.entityId(), dupe); } } - ImmutableList<Change> changes = ImmutableList.copyOf(changesByTitle.values()); - if (useBackoff && changes.size() == 0 && result.size() >= batchSize) { + final ImmutableList<Change> changes = ImmutableList.copyOf(changesByTitle.values()); + final boolean backoffOverflow = useBackoff && changes.size() == 0 && result.size() >= batchSize; + if (backoffOverflow) { // We have a problem here - due to backoff, we did not fetch any new items - // Try to advance one second, even though we risk to lose a change - log.info("Backoff overflow, advancing one second"); + // Try to advance one second, even though we risk to lose a change - in hope + // that trailing poller will pick them up. nextStartTime += 1000; + log.info("Backoff overflow, advancing to {}", inputDateFormat().format(new Date(nextStartTime))); } if (changes.size() != 0) { @@ -355,7 +384,12 @@ // be sure we got the whole second String upTo = inputDateFormat().format(new Date(nextStartTime - 1000)); long advanced = nextStartTime - lastNextStartTime.getTime(); - return new Batch(changes, advanced, upTo, new Date(nextStartTime), nextContinue); + Batch batch = new Batch(changes, advanced, upTo, new Date(nextStartTime), nextContinue); + if (backoffOverflow && nextContinue != null) { + // We will not sleep only if continue is provided. + batch.hasChanges(true); + } + return batch; } catch (java.text.ParseException e) { throw new RetryableException("Parse error from api", e); } diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java index e4d9b21..3eb17a8 100644 --- a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java +++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java @@ -2,6 +2,7 @@ import java.util.Date; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.time.DateUtils; import org.slf4j.Logger; @@ -43,10 +44,23 @@ */ private final BlockingQueue<Batch> queue; + /** + * Main poller timestamp. + */ + private final AtomicLong mainPollerTs = new AtomicLong(0); + public TailingChangesPoller(RecentChangesPoller poller, BlockingQueue<Batch> queue, int tailSeconds) { this.poller = poller; this.tailSeconds = tailSeconds; this.queue = queue; + } + + /** + * Set main poller timestamp. + * @param ts Main poller timestamp. + */ + public void setPollerTs(long ts) { + mainPollerTs.set(ts); } /** @@ -81,6 +95,15 @@ queue.put(lastBatch); } log.info("Tail poll up to {}", lastBatch.leftOffDate()); + long rcTs = mainPollerTs.get(); + if (rcTs > 0 && rcTs < lastBatch.leftOffDate().getTime()) { + // We are ahead of main poller, this is not good, normally should not happen + long sleepTime = lastBatch.leftOffDate().getTime() - rcTs + tailSeconds * 1000; + // Waiting for sleepTime does not guarantee RC poller would catch up + // - we don't how long that would take - but it gives it a chance. + log.info("Got ahead of main poller ({} > {}), sleeping for {}...", lastBatch.leftOffDate(), new Date(rcTs), sleepTime); + Thread.sleep(sleepTime); + } if (!isOldEnough(lastBatch.leftOffDate())) { // we're too far forward, let's sleep for a bit so we are couple // of seconds behind -- To view, visit https://gerrit.wikimedia.org/r/366752 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I94becda5dfe4a14998f509a708cdeb30f6686ed9 Gerrit-PatchSet: 1 Gerrit-Project: wikidata/query/rdf Gerrit-Branch: master Gerrit-Owner: Smalyshev <smalys...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits