Smalyshev has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/366752 )

Change subject: Fix TailPoller & sleep behavior
......................................................................

Fix TailPoller & sleep behavior

- RC poller should not sleep if it's not at the end of stream of changes,
even if current batch came out empty

- TailPoller should avoid jumping in front of RC poller.

Change-Id: I94becda5dfe4a14998f509a708cdeb30f6686ed9
---
M tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java
M tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
4 files changed, 82 insertions(+), 11 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf 
refs/changes/52/366752/1

diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java
index fb6bba0..ee40982 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Updater.java
@@ -265,11 +265,14 @@
                 log.warn("Retryable error fetching next batch.  Retrying.", e);
                 continue;
             }
-            if (batch.changes().isEmpty()) {
-                log.debug("Sleeping for {} secs", pollDelay);
+            if (batch.noChanges()) {
+                log.info("Sleeping for {} secs", pollDelay);
                 Thread.sleep(pollDelay * 1000);
                 continue;
             }
+            if (batch.changes().isEmpty()) {
+                continue;
+            }
             log.debug("{} changes in batch", batch.changes().size());
             return batch;
         }
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java
index f8ca2f1..f79d11a 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/change/Change.java
@@ -144,6 +144,12 @@
         List<Change> changes();
 
         /**
+         * Whether this batch had no changes.
+         * @return had changes?
+         */
+        boolean noChanges();
+
+        /**
          * The unit of advanced() in English. Used for logging.
          */
         String advancedUnits();
@@ -201,6 +207,11 @@
             }
 
             @Override
+            public boolean noChanges() {
+                return changes.isEmpty();
+            }
+
+            @Override
             public long advanced() {
                 return advanced;
             }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
index 140f993..d2fc42b 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
@@ -151,12 +151,16 @@
      * @return
      */
     private Batch checkTailPoller(Batch lastBatch) {
-        if (tailSeconds <= 0 ||
-            lastBatch.leftOffDate().before(DateUtils.addSeconds(new Date(), 
-tailSeconds))) {
-            // still not caught up, do nothing
+        if (tailSeconds <= 0) {
+            // not enabled, we're done here
             return lastBatch;
         }
+
         if (tailPoller == null) {
+            if (lastBatch.leftOffDate().before(DateUtils.addSeconds(new 
Date(), -tailSeconds))) {
+                // still not caught up, do nothing
+                return lastBatch;
+            }
             // We don't have poller yet - start it
             log.info("Started trailing poller with gap of {} seconds", 
tailSeconds);
             // Create new poller starting back tailSeconds and same IDs map.
@@ -165,6 +169,7 @@
             tailPoller = new TailingChangesPoller(poller, queue, tailSeconds);
             tailPoller.start();
         } else {
+            tailPoller.setPollerTs(lastBatch.leftOffDate().getTime());
             final Batch queuedBatch = queue.poll();
             if (queuedBatch != null) {
                 log.info("Merging {} changes from trailing queue", 
queuedBatch.changes().size());
@@ -189,6 +194,11 @@
         private final JSONObject lastContinue;
 
         /**
+         * Flag that states we have had changes, even though we didn't return 
them.
+         */
+        private boolean hasChanges;
+
+        /**
          * A batch that will next continue using the continue parameter.
          */
         private Batch(ImmutableList<Change> changes, long advanced,
@@ -197,6 +207,23 @@
             leftOffDate = nextStartTime;
             this.lastContinue = lastContinue;
         }
+
+        /**
+         * Set changes status.
+         * @param changes Whether we really had changes or not.
+         */
+        public void hasChanges(boolean changes) {
+            hasChanges = changes;
+        }
+
+        @Override
+        public boolean noChanges() {
+            if (hasChanges) {
+                return false;
+            }
+            return super.noChanges();
+        }
+
 
         @Override
         public String advancedUnits() {
@@ -277,7 +304,7 @@
      *
      * @throws RetryableException on parse failure
      */
-    @SuppressWarnings("checkstyle:cyclomaticcomplexity")
+    @SuppressWarnings({"checkstyle:npathcomplexity", 
"checkstyle:cyclomaticcomplexity"})
     private Batch batch(Date lastNextStartTime, Batch lastBatch) throws 
RetryableException {
         try {
             JSONObject recentChanges = fetchRecentChanges(lastNextStartTime, 
lastBatch);
@@ -335,12 +362,14 @@
                     changesByTitle.put(change.entityId(), dupe);
                 }
             }
-            ImmutableList<Change> changes = 
ImmutableList.copyOf(changesByTitle.values());
-            if (useBackoff && changes.size() == 0 && result.size() >= 
batchSize) {
+            final ImmutableList<Change> changes = 
ImmutableList.copyOf(changesByTitle.values());
+            final boolean backoffOverflow = useBackoff && changes.size() == 0 
&& result.size() >= batchSize;
+            if (backoffOverflow) {
                 // We have a problem here - due to backoff, we did not fetch 
any new items
-                // Try to advance one second, even though we risk to lose a 
change
-                log.info("Backoff overflow, advancing one second");
+                // Try to advance one second, even though we risk to lose a 
change - in hope
+                // that trailing poller will pick them up.
                 nextStartTime += 1000;
+                log.info("Backoff overflow, advancing to {}", 
inputDateFormat().format(new Date(nextStartTime)));
             }
 
             if (changes.size() != 0) {
@@ -355,7 +384,12 @@
             // be sure we got the whole second
             String upTo = inputDateFormat().format(new Date(nextStartTime - 
1000));
             long advanced = nextStartTime - lastNextStartTime.getTime();
-            return new Batch(changes, advanced, upTo, new Date(nextStartTime), 
nextContinue);
+            Batch batch = new Batch(changes, advanced, upTo, new 
Date(nextStartTime), nextContinue);
+            if (backoffOverflow && nextContinue != null) {
+                // We will not sleep only if continue is provided.
+                batch.hasChanges(true);
+            }
+            return batch;
         } catch (java.text.ParseException e) {
             throw new RetryableException("Parse error from api", e);
         }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
index e4d9b21..3eb17a8 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/TailingChangesPoller.java
@@ -2,6 +2,7 @@
 
 import java.util.Date;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.time.DateUtils;
 import org.slf4j.Logger;
@@ -43,10 +44,23 @@
      */
     private final BlockingQueue<Batch> queue;
 
+    /**
+     * Main poller timestamp.
+     */
+    private final AtomicLong mainPollerTs = new AtomicLong(0);
+
     public TailingChangesPoller(RecentChangesPoller poller, 
BlockingQueue<Batch> queue, int tailSeconds) {
         this.poller = poller;
         this.tailSeconds = tailSeconds;
         this.queue = queue;
+    }
+
+    /**
+     * Set main poller timestamp.
+     * @param ts Main poller timestamp.
+     */
+    public void setPollerTs(long ts) {
+        mainPollerTs.set(ts);
     }
 
     /**
@@ -81,6 +95,15 @@
                     queue.put(lastBatch);
                 }
                 log.info("Tail poll up to {}", lastBatch.leftOffDate());
+                long rcTs = mainPollerTs.get();
+                if (rcTs > 0 && rcTs < lastBatch.leftOffDate().getTime()) {
+                    // We are ahead of main poller, this is not good, normally 
should not happen
+                    long sleepTime = lastBatch.leftOffDate().getTime() - rcTs 
+ tailSeconds * 1000;
+                    // Waiting for sleepTime does not guarantee RC poller 
would catch up
+                    // - we don't how long that would take - but it gives it a 
chance.
+                    log.info("Got ahead of main poller ({} > {}), sleeping for 
{}...", lastBatch.leftOffDate(), new Date(rcTs), sleepTime);
+                    Thread.sleep(sleepTime);
+                }
                 if (!isOldEnough(lastBatch.leftOffDate())) {
                     // we're too far forward, let's sleep for a bit so we are 
couple
                     // of seconds behind

-- 
To view, visit https://gerrit.wikimedia.org/r/366752
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I94becda5dfe4a14998f509a708cdeb30f6686ed9
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <smalys...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to