jenkins-bot has submitted this change and it was merged.

Change subject: Switch recent changes polling to rcstart only and implement 
backoff
......................................................................


Switch recent changes polling to rcstart only and implement backoff

This will ensure we do not miss out-of-sequence updates.
Bug: T112397

Change-Id: I477f0fcbed00dd10ed086cdb752385e2c1f94ca0
---
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
5 files changed, 112 insertions(+), 46 deletions(-)

Approvals:
  Smalyshev: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index 3c50e3a..7029314 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -200,11 +200,6 @@
                             + "You will have to reload from scratch or you 
might have missing data.");
                     return null;
                 }
-                /*
-                 * -2 seconds to because our precision is only 1 second and
-                 * because it should be cheap to recheck that we have the right
-                 * revision.
-                 */
                 startTime = leftOff.getTime();
                 log.info("Found start time in the RDF store: {}", 
inputDateFormat().format(leftOff));
             }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
index 7b79e87..d8e9cd0 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
@@ -103,7 +103,9 @@
     @SuppressWarnings("checkstyle:cyclomaticcomplexity")
     private Batch batch(Date lastNextStartTime, JSONObject lastNextContinue) 
throws RetryableException {
         try {
-            JSONObject recentChanges = 
wikibase.fetchRecentChanges(lastNextStartTime, lastNextContinue, batchSize);
+            @SuppressWarnings("unchecked")
+            final Change continueChange = 
wikibase.getChangeFromContinue((Map<String, Object>)lastNextContinue);
+            JSONObject recentChanges = 
wikibase.fetchRecentChangesBackoff(lastNextStartTime, batchSize, true);
             // Using LinkedHashMap here so that changes came out sorted by 
order of arrival
             Map<String, Change> changesByTitle = new LinkedHashMap<>();
             JSONObject nextContinue = (JSONObject) 
recentChanges.get("continue");
@@ -113,15 +115,20 @@
             for (Object rco : result) {
                 JSONObject rc = (JSONObject) rco;
                 long namespace = (long) rc.get("ns");
+                long rcid = (long)rc.get("rcid");
                 if (!wikibase.isEntityNamespace(namespace)) {
                     log.info("Skipping change in irrelevant namespace:  {}", 
rc);
+                    continue;
+                }
+                if (continueChange != null && rcid < continueChange.rcid()) {
+                    // We've already seen this change, since it has older rcid 
- so skip it
                     continue;
                 }
                 Date timestamp = df.parse(rc.get("timestamp").toString());
                 Change change;
                 if (rc.get("type").toString().equals("log") && 
(long)rc.get("revid") == 0) {
                     // Deletes should always be processed, so put negative 
revision
-                    change = new Change(rc.get("title").toString(), -1L, 
timestamp, (long)rc.get("rcid"));
+                    change = new Change(rc.get("title").toString(), -1L, 
timestamp, rcid);
                 } else {
                     change = new Change(rc.get("title").toString(), (long) 
rc.get("revid"), timestamp, (long)rc.get("rcid"));
                 }
@@ -144,6 +151,12 @@
                     // Create fake rccontinue so we continue from last known 
change
                     nextContinue = 
wikibase.getContinueObject(changes.get(changes.size() - 1));
                 } else {
+                    if (result.size() >= batchSize) {
+                        // We have a problem here - due to backoff, we did not 
fetch any new items
+                        // Try to advance one second, even though we risk to 
lose a change
+                        log.info("Backoff overflow, advancing one second");
+                        nextStartTime += 1000;
+                    }
                     nextContinue = lastNextContinue;
                 }
             }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
index 900ff91..0ce58ad 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
@@ -15,12 +15,14 @@
 import java.util.Date;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.TimeZone;
 
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLHandshakeException;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.http.Consts;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpRequest;
@@ -149,21 +151,68 @@
     }
 
     /**
+     * How much to back off for recent fetches, in seconds.
+     */
+    private static final int BACKOFF_TIME = 10;
+
+    /**
+     * Adds backoff time to start time.
+     * If the start time is too close to now, add backoff time,
+     * to avoid race conditions.
+     * @param startTime
+     * @return Time when to start looking
+     */
+    private Date backoffTime(Date startTime) {
+        final Date now = new Date();
+        if (startTime.before(DateUtils.addMinutes(now, -5))) {
+            /*
+             * if start time is before 5 minutes back, it's ok
+             * -1 second because our precision is only 1 second and
+             * because it should be cheap to recheck that we have the right
+             * revision.
+             */
+            return DateUtils.addSeconds(startTime, -1);
+        }
+        return DateUtils.addSeconds(startTime, -BACKOFF_TIME);
+    }
+
+    /**
      * Fetch recent changes starting from nextStartTime or continuing from
      * lastContinue depending on the contents of lastContinue way to use
      * MediaWiki. See RecentChangesPoller for how to poll these. Or just use 
it.
      *
      * @param nextStartTime if lastContinue is null then this is the start time
      *            of the query
-     * @param lastContinue continuation point if not null
+     * @param batchSize the number of recent changes to fetch
+     * @param useBackoff Should we use backoff time to handle race conditions?
+     * @return result of query
+     * @throws RetryableException thrown if there is an error communicating 
with
+     *             wikibase
+     */
+    public JSONObject fetchRecentChangesBackoff(Date nextStartTime, int 
batchSize, boolean useBackoff) throws RetryableException
+    {
+        if (useBackoff) {
+            return fetchRecentChanges(backoffTime(nextStartTime), batchSize);
+        } else {
+            return fetchRecentChanges(nextStartTime, batchSize);
+        }
+    }
+
+    /**
+     * Fetch recent changes starting from nextStartTime or continuing from
+     * lastContinue depending on the contents of lastContinue way to use
+     * MediaWiki. See RecentChangesPoller for how to poll these. Or just use 
it.
+     *
+     * @param nextStartTime if lastContinue is null then this is the start time
+     *            of the query
      * @param batchSize the number of recent changes to fetch
      * @return result of query
      * @throws RetryableException thrown if there is an error communicating 
with
      *             wikibase
      */
-    public JSONObject fetchRecentChanges(Date nextStartTime, JSONObject 
lastContinue, int batchSize)
+    public JSONObject fetchRecentChanges(Date nextStartTime, int batchSize)
             throws RetryableException {
-        URI uri = uris.recentChanges(nextStartTime, lastContinue, batchSize);
+        URI uri = uris.recentChanges(nextStartTime, batchSize);
         log.debug("Polling for changes from {}", uri);
         try {
             return checkApi(getJson(new HttpGet(uri)));
@@ -405,11 +454,9 @@
          *
          * @param startTime the first date to poll from - usually if
          *            continueObject isn't null this is ignored by wikibase
-         * @param continueObject continue object sent from wikibase with the
-         *            last batch
          * @param batchSize maximum number of results we want back from 
wikibase
          */
-        public URI recentChanges(Date startTime, JSONObject continueObject, 
int batchSize) {
+        public URI recentChanges(Date startTime, int batchSize) {
             URIBuilder builder = apiBuilder();
             builder.addParameter("action", "query");
             builder.addParameter("list", "recentchanges");
@@ -417,13 +464,8 @@
             builder.addParameter("rcprop", "title|ids|timestamp");
             builder.addParameter("rcnamespace", 
getEntityNamespacesString("|"));
             builder.addParameter("rclimit", Integer.toString(batchSize));
-            if (continueObject == null) {
-                builder.addParameter("continue", "");
-                builder.addParameter("rcstart", 
outputDateFormat().format(startTime));
-            } else {
-                builder.addParameter("continue", 
continueObject.get("continue").toString());
-                builder.addParameter("rccontinue", 
continueObject.get("rccontinue").toString());
-            }
+            builder.addParameter("continue", "");
+            builder.addParameter("rcstart", 
outputDateFormat().format(startTime));
             return build(builder);
         }
 
@@ -608,5 +650,19 @@
         return nextContinue;
     }
 
+    /**
+     * Extract timestamp from continue JSON object.
+     * @param nextContinue
+     * @return Timestamp as date
+     * @throws java.text.ParseException When data is in is wrong format
+     */
+    public Change getChangeFromContinue(Map<String, Object> nextContinue) 
throws java.text.ParseException {
+        if (nextContinue == null) {
+            return null;
+        }
+        final String rccontinue = (String)nextContinue.get("rccontinue");
+        final String[] parts = rccontinue.split("\\|");
+        return new Change("DUMMY", -1, outputDateFormat().parse(parts[0]), 
Long.parseLong(parts[1]));
+    }
 
 }
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
index 9ee8ecd..3caeaa7 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
@@ -6,10 +6,7 @@
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static 
org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.outputDateFormat;
 
 import java.util.ArrayList;
@@ -18,6 +15,7 @@
 import java.util.Date;
 import java.util.List;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.junit.Before;
@@ -53,7 +51,7 @@
             rc.put("type", "edit");
             recentChanges.add(rc);
         }
-        when(repository.fetchRecentChanges(startTime, null, 
batchSize)).thenReturn(result);
+        when(repository.fetchRecentChangesBackoff(startTime, batchSize, 
true)).thenReturn(result);
         when(repository.isEntityNamespace(0)).thenReturn(true);
 
         RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
@@ -75,7 +73,8 @@
     @Test
     @SuppressWarnings("unchecked")
     public void continuePoll() throws RetryableException {
-        Date startTime = new Date();
+        // Use old date to remove backoff
+        Date startTime = DateUtils.addDays(new Date(), -10);
         int batchSize = 10;
 
         JSONObject result = new JSONObject();
@@ -108,7 +107,7 @@
         contJson.put("rccontinue", outputDateFormat().format(revDate) + "|8");
         contJson.put("continue", "-||");
 
-        when(repository.fetchRecentChanges(startTime, null, 
batchSize)).thenReturn(result);
+        when(repository.fetchRecentChangesBackoff(startTime, batchSize, 
true)).thenReturn(result);
         when(repository.getContinueObject((Change)any())).thenReturn(contJson);
         when(repository.isEntityNamespace(0)).thenReturn(true);
 
@@ -117,14 +116,14 @@
         assertEquals(2, batch.changes().size());
         assertEquals(7, batch.changes().get(1).rcid());
 
-        recentChanges.clear();
-        ArgumentCaptor<JSONObject> argument = 
ArgumentCaptor.forClass(JSONObject.class);
+        ArgumentCaptor<Date> argument = ArgumentCaptor.forClass(Date.class);
 
-        when(repository.fetchRecentChanges((Date)any(), (JSONObject)any(), 
anyInt())).thenReturn(result);
+        recentChanges.clear();
+        when(repository.fetchRecentChangesBackoff(argument.capture(), 
eq(batchSize), eq(true))).thenReturn(result);
         // check that poller passes the continue object to the next batch
-        poller.nextBatch(batch);
-        verify(repository, times(2)).fetchRecentChanges((Date)any(), 
argument.capture(), eq(batchSize));
-        assertEquals(contJson, argument.getValue());
+        batch = poller.nextBatch(batch);
+        assertEquals(0, batch.changes().size());
+        assertEquals(date, 
WikibaseRepository.inputDateFormat().format(argument.getValue()));
     }
 
     @Test
@@ -157,7 +156,7 @@
         rc.put("type", "edit");
         recentChanges.add(rc);
 
-        when(repository.fetchRecentChanges(startTime, null, 
batchSize)).thenReturn(result);
+        when(repository.fetchRecentChangesBackoff(startTime, batchSize, 
true)).thenReturn(result);
         when(repository.isEntityNamespace(0)).thenReturn(true);
 
         RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
index e92ab74..02a78cc 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
@@ -18,6 +18,7 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.junit.Test;
@@ -40,14 +41,14 @@
 
     @Test
     @SuppressWarnings("unchecked")
-    public void recentChangesWithLotsOfChangesHasContinue() throws 
RetryableException {
+    public void recentChangesWithLotsOfChangesHasContinue() throws 
RetryableException, ParseException {
         /*
          * This relies on there being lots of changes in the past 30 days. 
Which
          * is probably ok.
          */
         int batchSize = randomIntBetween(3, 30);
         JSONObject changes = repo.fetchRecentChanges(new 
Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30)),
-                null, batchSize);
+                batchSize);
         Map<String, Object> c = changes;
         assertThat(c, hasKey("continue"));
         assertThat((Map<String, Object>) changes.get("continue"), 
hasKey("rccontinue"));
@@ -63,7 +64,8 @@
             assertThat(rc, hasEntry(equalTo("timestamp"), 
instanceOf(String.class)));
             assertThat(rc, hasEntry(equalTo("revid"), instanceOf(Long.class)));
         }
-        changes = repo.fetchRecentChanges(null /* ignored */, (JSONObject) 
changes.get("continue"), batchSize);
+        final Date nextDate = repo.getChangeFromContinue((Map<String, 
Object>)changes.get("continue")).timestamp();
+        changes = repo.fetchRecentChanges(nextDate, batchSize);
         assertThat(c, hasKey("query"));
         assertThat((Map<String, Object>) c.get("query"), 
hasKey("recentchanges"));
     }
@@ -73,9 +75,9 @@
     public void recentChangesWithFewChangesHasNoContinue() throws 
RetryableException {
         /*
          * This relies on there being very few changes in the current
-         * millisecond.
+         * second.
          */
-        JSONObject changes = repo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), null, 500);
+        JSONObject changes = repo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), 500);
         Map<String, Object> c = changes;
         assertThat(c, not(hasKey("continue")));
         assertThat(c, hasKey("query"));
@@ -92,7 +94,7 @@
         editShowsUpInRecentChangesTestCase("QueryTestProperty", "property");
     }
 
-    private JSONArray getRecentChanges(Date date, JSONObject contObj, int 
batchSize) throws RetryableException,
+    private JSONArray getRecentChanges(Date date, int batchSize) throws 
RetryableException,
         ContainedException {
         // Add a bit of a wait to try and improve Jenkins test stability.
         try {
@@ -100,7 +102,7 @@
         } catch (InterruptedException e) {
             // nothing to do here, sorry. I know it looks bad.
         }
-        JSONObject result = repo.fetchRecentChanges(date, contObj, batchSize);
+        JSONObject result = repo.fetchRecentChanges(date, batchSize);
         return (JSONArray) ((JSONObject) 
result.get("query")).get("recentchanges");
     }
 
@@ -110,7 +112,7 @@
         long now = System.currentTimeMillis();
         String entityId = repo.firstEntityIdForLabelStartingWith(label, "en", 
type);
         repo.setLabel(entityId, type, label + now, "en");
-        JSONArray changes = getRecentChanges(new Date(now - 10000), null, 10);
+        JSONArray changes = getRecentChanges(new Date(now - 10000), 10);
         boolean found = false;
         String title = entityId;
         if (type.equals("property")) {
@@ -164,11 +166,11 @@
     }
 
     @Test
-    public void continueWorks() throws RetryableException, ContainedException, 
ParseException {
+    public void continueWorks() throws RetryableException, ContainedException, 
ParseException, InterruptedException {
         long now = System.currentTimeMillis();
         String entityId = 
repo.firstEntityIdForLabelStartingWith("QueryTestItem", "en", "item");
         repo.setLabel(entityId, "item", "QueryTestItem" + now, "en");
-        JSONArray changes = getRecentChanges(new Date(now - 10000), null, 10);
+        JSONArray changes = getRecentChanges(new Date(now - 10000), 10);
         Change change = null;
         long oldRevid = 0;
         long oldRcid = 0;
@@ -185,10 +187,11 @@
             }
         }
         assertNotNull("Did not find the first edit", change);
-        JSONObject continueObject = repo.getContinueObject(change);
+        // Ensure this change is in different second
+        Thread.sleep(1000);
         // make new edit now
         repo.setLabel(entityId, "item", "QueryTestItem" + now + "updated", 
"en");
-        changes = getRecentChanges(new Date(now - 10000), continueObject, 10);
+        changes = getRecentChanges(DateUtils.addSeconds(change.timestamp(), 
1), 10);
         // check that new result does not contain old edit but contains new 
edit
         boolean found = false;
         for (Object changeObject : changes) {
@@ -206,7 +209,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public void recentChangesWithErrors() throws RetryableException, 
ContainedException {
         WikibaseRepository proxyRepo = new WikibaseRepository("http", 
"localhost", 8812);
-        JSONObject changes = proxyRepo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), null, 500);
+        JSONObject changes = proxyRepo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), 500);
         Map<String, Object> c = changes;
         assertThat(c, not(hasKey("continue")));
         assertThat(c, hasKey("query"));

-- 
To view, visit https://gerrit.wikimedia.org/r/301907
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I477f0fcbed00dd10ed086cdb752385e2c1f94ca0
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to