Smalyshev has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/346676 )

Change subject: Set backoff to only work for recent changes
......................................................................

Set backoff to only work for recent changes

But keep using continuations for older ones. This does not resolve T162230
completely since you can still get stuck with short batch size but makes it
a bit better.

Bug: T162230
Change-Id: Ic548cbedb76b6fbe750fc1d50c79038c41844ae5
---
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
M 
tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
M 
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
5 files changed, 183 insertions(+), 119 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/wikidata/query/rdf 
refs/changes/76/346676/1

diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java 
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index b93a952..5d13798 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -1,6 +1,5 @@
 package org.wikidata.query.rdf.tool;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.wikidata.query.rdf.tool.OptionsUtils.handleOptions;
 import static org.wikidata.query.rdf.tool.OptionsUtils.mungerFromOptions;
 import static 
org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.inputDateFormat;
@@ -25,6 +24,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.openrdf.model.Statement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -326,11 +326,11 @@
                      * super good and because its not big deal to recheck if we
                      * have some updates.
                      */
-                    leftOffDate = new Date(batch.leftOffDate().getTime() - 
SECONDS.toMillis(1));
+                    leftOffDate = DateUtils.addSeconds(leftOffDate, -1);
                     if (oldDate == null || !oldDate.equals(leftOffDate)) {
                         // Do not update repo with the same date
-                        oldDate = leftOffDate;
                         rdfRepository.updateLeftOffTime(leftOffDate);
+                        oldDate = leftOffDate;
                     }
                 }
                 // TODO wrap all retry-able exceptions in a special exception
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
index 09ca176..12700e2 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
@@ -9,6 +9,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
@@ -39,6 +40,15 @@
      * Size of the batches to poll against wikibase.
      */
     private final int batchSize;
+    /**
+     * How much to back off for recent fetches, in seconds.
+     */
+    private static final int BACKOFF_TIME = 10;
+    /**
+     * How old should the change be to not apply backoff.
+     * The number is in minutes.
+     */
+    private static final int BACKOFF_THRESHOLD = 5;
 
     public RecentChangesPoller(WikibaseRepository wikibase, Date 
firstStartTime, int batchSize) {
         this.wikibase = wikibase;
@@ -73,12 +83,20 @@
         private final Set<Long> seenIDs;
 
         /**
+         * Continue from last request. Can be null.
+         */
+        private final JSONObject lastContinue;
+
+        /**
          * A batch that will next continue using the continue parameter.
          */
-        private Batch(ImmutableList<Change> changes, long advanced, String 
leftOff, Date nextStartTime, Set<Long> seenIDs) {
+        private Batch(ImmutableList<Change> changes, long advanced,
+                String leftOff, Date nextStartTime, JSONObject lastContinue,
+                Set<Long> seenIDs) {
             super(changes, advanced, leftOff);
             leftOffDate = nextStartTime;
             this.seenIDs = seenIDs;
+            this.lastContinue = lastContinue;
         }
 
         @Override
@@ -93,8 +111,8 @@
 
         @Override
         public String leftOffHuman() {
-            return WikibaseRepository.inputDateFormat().format(leftOffDate);
-            // + " (next: " + nextContinue.get("rccontinue").toString() + ")";
+            return WikibaseRepository.inputDateFormat().format(leftOffDate)
+                + " (next: " + lastContinue.get("rccontinue").toString() + ")";
         }
 
         /**
@@ -103,6 +121,45 @@
          */
         public Set<Long> getSeenIDs() {
             return seenIDs;
+        }
+
+        /**
+         * Get continue object.
+         * @return
+         */
+        public JSONObject getLastContinue() {
+            return lastContinue;
+        }
+    }
+
+    /**
+     * Check whether change is very recent.
+     * If it is we need to backoff to allow capturing unsettled DB changes.
+     * @param nextStartTime
+     * @return
+     */
+    private boolean changeIsRecent(Date nextStartTime) {
+        return nextStartTime.after(DateUtils.addMinutes(new Date(), 
-BACKOFF_THRESHOLD));
+    }
+
+    /**
+     * Fetch recent changes from Wikibase.
+     * If we're close to current time, we back off a bit from last timestamp,
+     * and fetch by timestamp. If it's back in the past, we fetch by 
continuation.
+     * @param lastNextStartTime
+     * @param lastBatch
+     * @return
+     * @throws RetryableException on fetch failure
+     */
+    private JSONObject fetchRecentChanges(Date lastNextStartTime, Batch 
lastBatch) throws RetryableException {
+        if (changeIsRecent(lastNextStartTime)) {
+            return wikibase.fetchRecentChangesByTime(
+                    DateUtils.addSeconds(lastNextStartTime, -BACKOFF_TIME),
+                    batchSize);
+        } else {
+            return wikibase.fetchRecentChanges(lastNextStartTime,
+                    lastBatch != null ? lastBatch.getLastContinue() : null,
+                    batchSize);
         }
     }
 
@@ -114,8 +171,7 @@
     @SuppressWarnings("checkstyle:cyclomaticcomplexity")
     private Batch batch(Date lastNextStartTime, Batch lastBatch) throws 
RetryableException {
         try {
-            @SuppressWarnings("unchecked")
-            JSONObject recentChanges = 
wikibase.fetchRecentChangesBackoff(lastNextStartTime, batchSize, true);
+            JSONObject recentChanges = fetchRecentChanges(lastNextStartTime, 
lastBatch);
             // Using LinkedHashMap here so that changes came out sorted by 
order of arrival
             Map<String, Change> changesByTitle = new LinkedHashMap<>();
             JSONObject nextContinue = (JSONObject) 
recentChanges.get("continue");
@@ -170,7 +226,7 @@
                 nextStartTime = Math.max(nextStartTime, timestamp.getTime());
             }
             ImmutableList<Change> changes = 
ImmutableList.copyOf(changesByTitle.values());
-            if (nextContinue == null && changes.size() == 0 && result.size() 
>= batchSize) {
+            if (changes.size() == 0 && result.size() >= batchSize) {
                 // We have a problem here - due to backoff, we did not fetch 
any new items
                 // Try to advance one second, even though we risk to lose a 
change
                 log.info("Backoff overflow, advancing one second");
@@ -189,7 +245,7 @@
             // be sure we got the whole second
             String upTo = inputDateFormat().format(new Date(nextStartTime - 
1000));
             long advanced = nextStartTime - lastNextStartTime.getTime();
-            return new Batch(changes, advanced, upTo, new Date(nextStartTime), 
seenIds);
+            return new Batch(changes, advanced, upTo, new Date(nextStartTime), 
nextContinue, seenIds);
         } catch (java.text.ParseException e) {
             throw new RetryableException("Parse error from api", e);
         }
diff --git 
a/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
 
b/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
index 12481a6..db7a97a 100644
--- 
a/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
+++ 
b/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
@@ -22,7 +22,6 @@
 import javax.net.ssl.SSLHandshakeException;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.time.DateUtils;
 import org.apache.http.Consts;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpRequest;
@@ -151,53 +150,6 @@
     }
 
     /**
-     * How much to back off for recent fetches, in seconds.
-     */
-    private static final int BACKOFF_TIME = 10;
-
-    /**
-     * Adds backoff time to start time.
-     * If the start time is too close to now, add backoff time,
-     * to avoid race conditions.
-     * @param startTime
-     * @return Time when to start looking
-     */
-    private Date backoffTime(Date startTime) {
-        if (startTime.before(DateUtils.addMinutes(new Date(), -5))) {
-            /*
-             * if start time is before 5 minutes back, it's ok
-             * -1 second because our precision is only 1 second and
-             * because it should be cheap to recheck that we have the right
-             * revision.
-             */
-            return DateUtils.addSeconds(startTime, -1);
-        }
-        return DateUtils.addSeconds(startTime, -BACKOFF_TIME);
-    }
-
-    /**
-     * Fetch recent changes starting from nextStartTime or continuing from
-     * lastContinue depending on the contents of lastContinue way to use
-     * MediaWiki. See RecentChangesPoller for how to poll these. Or just use 
it.
-     *
-     * @param nextStartTime if lastContinue is null then this is the start time
-     *            of the query
-     * @param batchSize the number of recent changes to fetch
-     * @param useBackoff Should we use backoff time to handle race conditions?
-     * @return result of query
-     * @throws RetryableException thrown if there is an error communicating 
with
-     *             wikibase
-     */
-    public JSONObject fetchRecentChangesBackoff(Date nextStartTime, int 
batchSize, boolean useBackoff) throws RetryableException
-    {
-        if (useBackoff) {
-            return fetchRecentChanges(backoffTime(nextStartTime), batchSize);
-        } else {
-            return fetchRecentChanges(nextStartTime, batchSize);
-        }
-    }
-
-    /**
      * Fetch recent changes starting from nextStartTime or continuing from
      * lastContinue depending on the contents of lastContinue way to use
      * MediaWiki. See RecentChangesPoller for how to poll these. Or just use 
it.
@@ -209,9 +161,26 @@
      * @throws RetryableException thrown if there is an error communicating 
with
      *             wikibase
      */
-    public JSONObject fetchRecentChanges(Date nextStartTime, int batchSize)
+    public JSONObject fetchRecentChangesByTime(Date nextStartTime, int 
batchSize) throws RetryableException {
+        return fetchRecentChanges(nextStartTime, null, batchSize);
+    }
+
+    /**
+     * Fetch recent changes starting from nextStartTime or continuing from
+     * lastContinue depending on the contents of lastContinue way to use
+     * MediaWiki. See RecentChangesPoller for how to poll these. Or just use 
it.
+     *
+     * @param nextStartTime if lastContinue is null then this is the start time
+     *            of the query
+     * @param batchSize the number of recent changes to fetch
+     * @param lastContinue Continuation object from last batch, or null.
+     * @return result of query
+     * @throws RetryableException thrown if there is an error communicating 
with
+     *             wikibase
+     */
+    public JSONObject fetchRecentChanges(Date nextStartTime, JSONObject 
lastContinue, int batchSize)
             throws RetryableException {
-        URI uri = uris.recentChanges(nextStartTime, batchSize);
+        URI uri = uris.recentChanges(nextStartTime, lastContinue, batchSize);
         log.debug("Polling for changes from {}", uri);
         try {
             return checkApi(getJson(new HttpGet(uri)));
@@ -464,7 +433,7 @@
          *            continueObject isn't null this is ignored by wikibase
          * @param batchSize maximum number of results we want back from 
wikibase
          */
-        public URI recentChanges(Date startTime, int batchSize) {
+        public URI recentChanges(Date startTime, JSONObject continueObject, 
int batchSize) {
             URIBuilder builder = apiBuilder();
             builder.addParameter("action", "query");
             builder.addParameter("list", "recentchanges");
@@ -472,8 +441,13 @@
             builder.addParameter("rcprop", "title|ids|timestamp");
             builder.addParameter("rcnamespace", 
getEntityNamespacesString("|"));
             builder.addParameter("rclimit", Integer.toString(batchSize));
-            builder.addParameter("continue", "");
-            builder.addParameter("rcstart", 
outputDateFormat().format(startTime));
+            if (continueObject == null) {
+                builder.addParameter("continue", "");
+                builder.addParameter("rcstart", 
outputDateFormat().format(startTime));
+            } else {
+                builder.addParameter("continue", 
continueObject.get("continue").toString());
+                builder.addParameter("rccontinue", 
continueObject.get("rccontinue").toString());
+            }
             return build(builder);
         }
 
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
index 8d19629..b17c929 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
@@ -4,14 +4,12 @@
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.verify;
 import static 
org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.outputDateFormat;
 
 import java.util.ArrayList;
@@ -33,11 +31,32 @@
 public class RecentChangesPollerUnitTest {
     private WikibaseRepository repository;
 
+    private int batchSize = 10;
+
+    /**
+     * Mock return of the first batch call to repository.
+     * @param startTime
+     * @param result
+     * @throws RetryableException
+     */
+    private void firstBatchReturns(Date startTime, JSONObject result) throws 
RetryableException {
+        when(repository.fetchRecentChangesByTime(any(Date.class), 
eq(batchSize))).thenCallRealMethod();
+        when(repository.fetchRecentChanges(any(Date.class), 
(JSONObject)eq(null), eq(batchSize))).thenReturn(result);
+        when(repository.isEntityNamespace(0)).thenReturn(true);
+        when(repository.isValidEntity(any(String.class))).thenReturn(true);
+
+    }
+
+    /**
+     * Check deduplication.
+     * Create 20 changes, of which each two are dupes,
+     * check that dupes are eliminated.
+     * @throws RetryableException
+     */
     @Test
     @SuppressWarnings("unchecked")
     public void dedups() throws RetryableException {
         Date startTime = new Date();
-        int batchSize = 10;
         // Build a result from wikibase with duplicate recent changes
         JSONObject result = new JSONObject();
         JSONObject query = new JSONObject();
@@ -56,12 +75,11 @@
             rc.put("type", "edit");
             recentChanges.add(rc);
         }
-        when(repository.fetchRecentChangesBackoff(startTime, batchSize, 
true)).thenReturn(result);
-        when(repository.isEntityNamespace(0)).thenReturn(true);
-        when(repository.isValidEntity(any(String.class))).thenReturn(true);
 
+        firstBatchReturns(startTime, result);
         RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
         Batch batch = poller.firstBatch();
+
         assertThat(batch.changes(), hasSize(10));
         List<Change> changes = new ArrayList<>(batch.changes());
         Collections.sort(changes, new Comparator<Change>() {
@@ -76,6 +94,11 @@
         }
     }
 
+    /**
+     * Check that continuing works.
+     * Check that poller passes continuation to the next batch.
+     * @throws RetryableException
+     */
     @Test
     @SuppressWarnings("unchecked")
     public void continuePoll() throws RetryableException {
@@ -88,7 +111,7 @@
         JSONArray recentChanges = new JSONArray();
         JSONObject query = new JSONObject();
 
-        Date revDate = new Date();
+        Date revDate = DateUtils.addSeconds(startTime, 20);
         String date = WikibaseRepository.inputDateFormat().format(revDate);
         rc.put("ns", Long.valueOf(0));
         rc.put("title", "Q666");
@@ -112,33 +135,38 @@
         JSONObject contJson = new JSONObject();
         contJson.put("rccontinue", outputDateFormat().format(revDate) + "|8");
         contJson.put("continue", "-||");
+        result.put("continue", contJson);
 
-        when(repository.fetchRecentChangesBackoff(startTime, batchSize, 
true)).thenReturn(result);
-        when(repository.getContinueObject((Change)any())).thenReturn(contJson);
-        when(repository.isEntityNamespace(0)).thenReturn(true);
-        when(repository.isValidEntity(any(String.class))).thenReturn(true);
+        firstBatchReturns(startTime, result);
 
         RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
         Batch batch = poller.firstBatch();
         assertThat(batch.changes(), hasSize(2));
         assertEquals(7, batch.changes().get(1).rcid());
-        assertEquals(date, batch.leftOffHuman());
+        assertEquals(date, 
WikibaseRepository.inputDateFormat().format(batch.leftOffDate()));
+        assertEquals(contJson, batch.getLastContinue());
 
-        ArgumentCaptor<Date> argument = ArgumentCaptor.forClass(Date.class);
+        ArgumentCaptor<Date> argumentDate = 
ArgumentCaptor.forClass(Date.class);
+        ArgumentCaptor<JSONObject> argumentJson = 
ArgumentCaptor.forClass(JSONObject.class);
 
         recentChanges.clear();
-        when(repository.fetchRecentChangesBackoff(argument.capture(), 
eq(batchSize), eq(true))).thenReturn(result);
+        when(repository.fetchRecentChanges(argumentDate.capture(), 
argumentJson.capture(), eq(batchSize))).thenReturn(result);
         // check that poller passes the continue object to the next batch
         batch = poller.nextBatch(batch);
         assertThat(batch.changes(), hasSize(0));
-        assertEquals(date, 
WikibaseRepository.inputDateFormat().format(argument.getValue()));
+        assertEquals(date, 
WikibaseRepository.inputDateFormat().format(argumentDate.getValue()));
+        assertEquals(contJson, argumentJson.getValue());
     }
 
+    /**
+     * Check that delete is processed.
+     * @throws RetryableException
+     */
     @Test
     @SuppressWarnings("unchecked")
     public void delete() throws RetryableException {
-        Date startTime = new Date();
-        int batchSize = 10;
+        // Use old date to remove backoff
+        Date startTime = DateUtils.addDays(new Date(), -10);
         // Build a result from wikibase with duplicate recent changes
         JSONObject result = new JSONObject();
         JSONObject query = new JSONObject();
@@ -150,7 +178,7 @@
         rc.put("ns", Long.valueOf(0));
         rc.put("title", "Q424242");
         rc.put("timestamp", date);
-        rc.put("revid", Long.valueOf(0));
+        rc.put("revid", Long.valueOf(0)); // 0 means delete
         rc.put("rcid", 42L);
         rc.put("type", "log");
         recentChanges.add(rc);
@@ -164,9 +192,7 @@
         rc.put("type", "edit");
         recentChanges.add(rc);
 
-        when(repository.fetchRecentChangesBackoff(startTime, batchSize, 
true)).thenReturn(result);
-        when(repository.isEntityNamespace(0)).thenReturn(true);
-        when(repository.isValidEntity(any(String.class))).thenReturn(true);
+        firstBatchReturns(startTime, result);
 
         RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
         Batch batch = poller.firstBatch();
@@ -177,11 +203,15 @@
         assertEquals(changes.get(0).revision(), -1L);
     }
 
+    /**
+     * Check that recent requests use backoff.
+     * @throws RetryableException
+     */
     @Test
     @SuppressWarnings("unchecked")
     public void backoffTime() throws RetryableException {
         Date startTime = new Date();
-        RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, 10);
+        RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, batchSize);
 
         JSONObject result = new JSONObject();
         JSONObject query = new JSONObject();
@@ -189,19 +219,42 @@
         JSONArray recentChanges = new JSONArray();
         query.put("recentchanges", recentChanges);
 
+        Date nextStartTime = DateUtils.addSeconds(startTime, 20);
+        String date = 
WikibaseRepository.inputDateFormat().format(nextStartTime);
+        JSONObject rc = new JSONObject();
+        rc.put("ns", Long.valueOf(0));
+        rc.put("title", "Q424242");
+        rc.put("timestamp", date);
+        rc.put("revid", 42L);
+        rc.put("rcid", 42L);
+        rc.put("type", "edit");
+        recentChanges.add(rc);
+
         ArgumentCaptor<Date> argument = ArgumentCaptor.forClass(Date.class);
-        when(repository.fetchRecentChanges((Date)any(), 
anyInt())).thenReturn(result);
-        when(repository.fetchRecentChangesBackoff((Date)any(), anyInt(), 
anyBoolean())).thenCallRealMethod();
+        when(repository.fetchRecentChangesByTime(argument.capture(), 
eq(batchSize))).thenReturn(result);
+        when(repository.isEntityNamespace(0)).thenReturn(true);
+        when(repository.isValidEntity(any(String.class))).thenReturn(true);
+
         Batch batch = poller.firstBatch();
 
-        verify(repository).fetchRecentChanges(argument.capture(), eq(10));
-        // Ensure we backed off at least 10 seconds but no more than 30
+        // Ensure we backed off at least 7 seconds but no more than 20
         assertThat(argument.getValue(), 
lessThan(DateUtils.addSeconds(startTime, -7)));
-        assertThat(argument.getValue(), 
greaterThan(DateUtils.addSeconds(startTime, -30)));
+        assertThat(argument.getValue(), 
greaterThan(DateUtils.addSeconds(startTime, -20)));
+
+        // Verify that backoff still works on the second call
+        batch = poller.nextBatch(batch);
+        assertNotNull(batch); // verify we're still using 
fetchRecentChangesByTime
+        assertThat(argument.getValue(), 
lessThan(DateUtils.addSeconds(nextStartTime, -7)));
+        assertThat(argument.getValue(), 
greaterThan(DateUtils.addSeconds(nextStartTime, -20)));
+
     }
 
+    /**
+     * Verify that no backoff happens for old changes.
+     * @throws RetryableException
+     */
     @SuppressWarnings("unchecked")
-    public void backoffTimeOld() throws RetryableException {
+    public void noBackoffForOld() throws RetryableException {
         Date startTime = DateUtils.addDays(new Date(), -1);
         RecentChangesPoller poller = new RecentChangesPoller(repository, 
startTime, 10);
 
@@ -212,31 +265,12 @@
         query.put("recentchanges", recentChanges);
 
         ArgumentCaptor<Date> argument = ArgumentCaptor.forClass(Date.class);
-        when(repository.fetchRecentChanges((Date)any(), 
anyInt())).thenReturn(result);
-        when(repository.fetchRecentChangesBackoff((Date)any(), anyInt(), 
anyBoolean())).thenCallRealMethod();
+        when(repository.fetchRecentChanges(argument.capture(), 
any(JSONObject.class), eq(batchSize))).thenReturn(result);
+        when(repository.isEntityNamespace(0)).thenReturn(true);
         when(repository.isValidEntity(any(String.class))).thenReturn(true);
         Batch batch = poller.firstBatch();
 
-        verify(repository).fetchRecentChanges(argument.capture(), eq(10));
-        // Ensure we backed off no more than one second
-        assertThat(argument.getValue(), lessThan(startTime));
-        assertThat(argument.getValue(), 
greaterThan(DateUtils.addSeconds(startTime, -2)));
-    }
-
-
-    @Test
-    public void backoffArg() throws RetryableException {
-        Date startTime = new Date();
-        JSONObject result = new JSONObject();
-
-        when(repository.fetchRecentChanges((Date)any(), 
eq(10))).thenReturn(result);
-        when(repository.fetchRecentChangesBackoff((Date)any(), anyInt(), 
anyBoolean())).thenCallRealMethod();
-        when(repository.isValidEntity(any(String.class))).thenReturn(true);
-        ArgumentCaptor<Date> argument = ArgumentCaptor.forClass(Date.class);
-
-        repository.fetchRecentChangesBackoff(startTime, 10, false);
-        verify(repository).fetchRecentChanges(argument.capture(), eq(10));
-        assertEquals(startTime, argument.getValue());
+        assertEquals(argument.getValue(), startTime);
     }
 
     @Before
diff --git 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
index 02a78cc..d02e855 100644
--- 
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
+++ 
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
@@ -48,7 +48,7 @@
          */
         int batchSize = randomIntBetween(3, 30);
         JSONObject changes = repo.fetchRecentChanges(new 
Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30)),
-                batchSize);
+                null, batchSize);
         Map<String, Object> c = changes;
         assertThat(c, hasKey("continue"));
         assertThat((Map<String, Object>) changes.get("continue"), 
hasKey("rccontinue"));
@@ -65,7 +65,7 @@
             assertThat(rc, hasEntry(equalTo("revid"), instanceOf(Long.class)));
         }
         final Date nextDate = repo.getChangeFromContinue((Map<String, 
Object>)changes.get("continue")).timestamp();
-        changes = repo.fetchRecentChanges(nextDate, batchSize);
+        changes = repo.fetchRecentChanges(nextDate, null, batchSize);
         assertThat(c, hasKey("query"));
         assertThat((Map<String, Object>) c.get("query"), 
hasKey("recentchanges"));
     }
@@ -77,7 +77,7 @@
          * This relies on there being very few changes in the current
          * second.
          */
-        JSONObject changes = repo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), 500);
+        JSONObject changes = repo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), null, 500);
         Map<String, Object> c = changes;
         assertThat(c, not(hasKey("continue")));
         assertThat(c, hasKey("query"));
@@ -102,7 +102,7 @@
         } catch (InterruptedException e) {
             // nothing to do here, sorry. I know it looks bad.
         }
-        JSONObject result = repo.fetchRecentChanges(date, batchSize);
+        JSONObject result = repo.fetchRecentChanges(date, null, batchSize);
         return (JSONArray) ((JSONObject) 
result.get("query")).get("recentchanges");
     }
 
@@ -209,7 +209,7 @@
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public void recentChangesWithErrors() throws RetryableException, 
ContainedException {
         WikibaseRepository proxyRepo = new WikibaseRepository("http", 
"localhost", 8812);
-        JSONObject changes = proxyRepo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), 500);
+        JSONObject changes = proxyRepo.fetchRecentChanges(new 
Date(System.currentTimeMillis()), null, 500);
         Map<String, Object> c = changes;
         assertThat(c, not(hasKey("continue")));
         assertThat(c, hasKey("query"));

-- 
To view, visit https://gerrit.wikimedia.org/r/346676
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic548cbedb76b6fbe750fc1d50c79038c41844ae5
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <smalys...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to