jenkins-bot has submitted this change and it was merged.
Change subject: Switch recent changes polling to rcstart only and implement
backoff
......................................................................
Switch recent changes polling to rcstart only and implement backoff
This will ensure we do not miss out-of-sequence updates.
Bug: T112397
Change-Id: I477f0fcbed00dd10ed086cdb752385e2c1f94ca0
---
M tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
M
tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
M
tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
M
tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
M
tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
5 files changed, 112 insertions(+), 46 deletions(-)
Approvals:
Smalyshev: Looks good to me, approved
jenkins-bot: Verified
diff --git a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
index 3c50e3a..7029314 100644
--- a/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
+++ b/tools/src/main/java/org/wikidata/query/rdf/tool/Update.java
@@ -200,11 +200,6 @@
+ "You will have to reload from scratch or you
might have missing data.");
return null;
}
- /*
- * -2 seconds to because our precision is only 1 second and
- * because it should be cheap to recheck that we have the right
- * revision.
- */
startTime = leftOff.getTime();
log.info("Found start time in the RDF store: {}",
inputDateFormat().format(leftOff));
}
diff --git
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
index 7b79e87..d8e9cd0 100644
---
a/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
+++
b/tools/src/main/java/org/wikidata/query/rdf/tool/change/RecentChangesPoller.java
@@ -103,7 +103,9 @@
@SuppressWarnings("checkstyle:cyclomaticcomplexity")
private Batch batch(Date lastNextStartTime, JSONObject lastNextContinue)
throws RetryableException {
try {
- JSONObject recentChanges =
wikibase.fetchRecentChanges(lastNextStartTime, lastNextContinue, batchSize);
+ @SuppressWarnings("unchecked")
+ final Change continueChange =
wikibase.getChangeFromContinue((Map<String, Object>)lastNextContinue);
+ JSONObject recentChanges =
wikibase.fetchRecentChangesBackoff(lastNextStartTime, batchSize, true);
// Using LinkedHashMap here so that changes came out sorted by
order of arrival
Map<String, Change> changesByTitle = new LinkedHashMap<>();
JSONObject nextContinue = (JSONObject)
recentChanges.get("continue");
@@ -113,15 +115,20 @@
for (Object rco : result) {
JSONObject rc = (JSONObject) rco;
long namespace = (long) rc.get("ns");
+ long rcid = (long)rc.get("rcid");
if (!wikibase.isEntityNamespace(namespace)) {
log.info("Skipping change in irrelevant namespace: {}",
rc);
+ continue;
+ }
+ if (continueChange != null && rcid < continueChange.rcid()) {
+ // We've already seen this change, since it has older rcid
- so skip it
continue;
}
Date timestamp = df.parse(rc.get("timestamp").toString());
Change change;
if (rc.get("type").toString().equals("log") &&
(long)rc.get("revid") == 0) {
// Deletes should always be processed, so put negative
revision
- change = new Change(rc.get("title").toString(), -1L,
timestamp, (long)rc.get("rcid"));
+ change = new Change(rc.get("title").toString(), -1L,
timestamp, rcid);
} else {
change = new Change(rc.get("title").toString(), (long)
rc.get("revid"), timestamp, (long)rc.get("rcid"));
}
@@ -144,6 +151,12 @@
// Create fake rccontinue so we continue from last known
change
nextContinue =
wikibase.getContinueObject(changes.get(changes.size() - 1));
} else {
+ if (result.size() >= batchSize) {
+ // We have a problem here - due to backoff, we did not
fetch any new items
+ // Try to advance one second, even though we risk to
lose a change
+ log.info("Backoff overflow, advancing one second");
+ nextStartTime += 1000;
+ }
nextContinue = lastNextContinue;
}
}
diff --git
a/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
b/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
index 900ff91..0ce58ad 100644
---
a/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
+++
b/tools/src/main/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepository.java
@@ -15,12 +15,14 @@
import java.util.Date;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.TimeZone;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.time.DateUtils;
import org.apache.http.Consts;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
@@ -149,21 +151,68 @@
}
/**
+ * How much to back off for recent fetches, in seconds.
+ */
+ private static final int BACKOFF_TIME = 10;
+
+ /**
+ * Adds backoff time to start time.
+ * If the start time is too close to now, add backoff time,
+ * to avoid race conditions.
+ * @param startTime
+ * @return Time when to start looking
+ */
+ private Date backoffTime(Date startTime) {
+ final Date now = new Date();
+ if (startTime.before(DateUtils.addMinutes(now, -5))) {
+ /*
+ * if start time is before 5 minutes back, it's ok
+ * -1 second because our precision is only 1 second and
+ * because it should be cheap to recheck that we have the right
+ * revision.
+ */
+ return DateUtils.addSeconds(startTime, -1);
+ }
+ return DateUtils.addSeconds(startTime, -BACKOFF_TIME);
+ }
+
+ /**
* Fetch recent changes starting from nextStartTime or continuing from
* lastContinue depending on the contents of lastContinue way to use
* MediaWiki. See RecentChangesPoller for how to poll these. Or just use
it.
*
* @param nextStartTime if lastContinue is null then this is the start time
* of the query
- * @param lastContinue continuation point if not null
+ * @param batchSize the number of recent changes to fetch
+ * @param useBackoff Should we use backoff time to handle race conditions?
+ * @return result of query
+ * @throws RetryableException thrown if there is an error communicating
with
+ * wikibase
+ */
+ public JSONObject fetchRecentChangesBackoff(Date nextStartTime, int
batchSize, boolean useBackoff) throws RetryableException
+ {
+ if (useBackoff) {
+ return fetchRecentChanges(backoffTime(nextStartTime), batchSize);
+ } else {
+ return fetchRecentChanges(nextStartTime, batchSize);
+ }
+ }
+
+ /**
+ * Fetch recent changes starting from nextStartTime or continuing from
+ * lastContinue depending on the contents of lastContinue way to use
+ * MediaWiki. See RecentChangesPoller for how to poll these. Or just use
it.
+ *
+ * @param nextStartTime if lastContinue is null then this is the start time
+ * of the query
* @param batchSize the number of recent changes to fetch
* @return result of query
* @throws RetryableException thrown if there is an error communicating
with
* wikibase
*/
- public JSONObject fetchRecentChanges(Date nextStartTime, JSONObject
lastContinue, int batchSize)
+ public JSONObject fetchRecentChanges(Date nextStartTime, int batchSize)
throws RetryableException {
- URI uri = uris.recentChanges(nextStartTime, lastContinue, batchSize);
+ URI uri = uris.recentChanges(nextStartTime, batchSize);
log.debug("Polling for changes from {}", uri);
try {
return checkApi(getJson(new HttpGet(uri)));
@@ -405,11 +454,9 @@
*
* @param startTime the first date to poll from - usually if
* continueObject isn't null this is ignored by wikibase
- * @param continueObject continue object sent from wikibase with the
- * last batch
* @param batchSize maximum number of results we want back from
wikibase
*/
- public URI recentChanges(Date startTime, JSONObject continueObject,
int batchSize) {
+ public URI recentChanges(Date startTime, int batchSize) {
URIBuilder builder = apiBuilder();
builder.addParameter("action", "query");
builder.addParameter("list", "recentchanges");
@@ -417,13 +464,8 @@
builder.addParameter("rcprop", "title|ids|timestamp");
builder.addParameter("rcnamespace",
getEntityNamespacesString("|"));
builder.addParameter("rclimit", Integer.toString(batchSize));
- if (continueObject == null) {
- builder.addParameter("continue", "");
- builder.addParameter("rcstart",
outputDateFormat().format(startTime));
- } else {
- builder.addParameter("continue",
continueObject.get("continue").toString());
- builder.addParameter("rccontinue",
continueObject.get("rccontinue").toString());
- }
+ builder.addParameter("continue", "");
+ builder.addParameter("rcstart",
outputDateFormat().format(startTime));
return build(builder);
}
@@ -608,5 +650,19 @@
return nextContinue;
}
+ /**
+ * Extract timestamp from continue JSON object.
+ * @param nextContinue
+ * @return Timestamp as date
+ * @throws java.text.ParseException When data is in is wrong format
+ */
+ public Change getChangeFromContinue(Map<String, Object> nextContinue)
throws java.text.ParseException {
+ if (nextContinue == null) {
+ return null;
+ }
+ final String rccontinue = (String)nextContinue.get("rccontinue");
+ final String[] parts = rccontinue.split("\\|");
+ return new Change("DUMMY", -1, outputDateFormat().parse(parts[0]),
Long.parseLong(parts[1]));
+ }
}
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
index 9ee8ecd..3caeaa7 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/change/RecentChangesPollerUnitTest.java
@@ -6,10 +6,7 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static
org.wikidata.query.rdf.tool.wikibase.WikibaseRepository.outputDateFormat;
import java.util.ArrayList;
@@ -18,6 +15,7 @@
import java.util.Date;
import java.util.List;
+import org.apache.commons.lang3.time.DateUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.junit.Before;
@@ -53,7 +51,7 @@
rc.put("type", "edit");
recentChanges.add(rc);
}
- when(repository.fetchRecentChanges(startTime, null,
batchSize)).thenReturn(result);
+ when(repository.fetchRecentChangesBackoff(startTime, batchSize,
true)).thenReturn(result);
when(repository.isEntityNamespace(0)).thenReturn(true);
RecentChangesPoller poller = new RecentChangesPoller(repository,
startTime, batchSize);
@@ -75,7 +73,8 @@
@Test
@SuppressWarnings("unchecked")
public void continuePoll() throws RetryableException {
- Date startTime = new Date();
+ // Use old date to remove backoff
+ Date startTime = DateUtils.addDays(new Date(), -10);
int batchSize = 10;
JSONObject result = new JSONObject();
@@ -108,7 +107,7 @@
contJson.put("rccontinue", outputDateFormat().format(revDate) + "|8");
contJson.put("continue", "-||");
- when(repository.fetchRecentChanges(startTime, null,
batchSize)).thenReturn(result);
+ when(repository.fetchRecentChangesBackoff(startTime, batchSize,
true)).thenReturn(result);
when(repository.getContinueObject((Change)any())).thenReturn(contJson);
when(repository.isEntityNamespace(0)).thenReturn(true);
@@ -117,14 +116,14 @@
assertEquals(2, batch.changes().size());
assertEquals(7, batch.changes().get(1).rcid());
- recentChanges.clear();
- ArgumentCaptor<JSONObject> argument =
ArgumentCaptor.forClass(JSONObject.class);
+ ArgumentCaptor<Date> argument = ArgumentCaptor.forClass(Date.class);
- when(repository.fetchRecentChanges((Date)any(), (JSONObject)any(),
anyInt())).thenReturn(result);
+ recentChanges.clear();
+ when(repository.fetchRecentChangesBackoff(argument.capture(),
eq(batchSize), eq(true))).thenReturn(result);
// check that poller passes the continue object to the next batch
- poller.nextBatch(batch);
- verify(repository, times(2)).fetchRecentChanges((Date)any(),
argument.capture(), eq(batchSize));
- assertEquals(contJson, argument.getValue());
+ batch = poller.nextBatch(batch);
+ assertEquals(0, batch.changes().size());
+ assertEquals(date,
WikibaseRepository.inputDateFormat().format(argument.getValue()));
}
@Test
@@ -157,7 +156,7 @@
rc.put("type", "edit");
recentChanges.add(rc);
- when(repository.fetchRecentChanges(startTime, null,
batchSize)).thenReturn(result);
+ when(repository.fetchRecentChangesBackoff(startTime, batchSize,
true)).thenReturn(result);
when(repository.isEntityNamespace(0)).thenReturn(true);
RecentChangesPoller poller = new RecentChangesPoller(repository,
startTime, batchSize);
diff --git
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
index e92ab74..02a78cc 100644
---
a/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
+++
b/tools/src/test/java/org/wikidata/query/rdf/tool/wikibase/WikibaseRepositoryIntegrationTest.java
@@ -18,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.time.DateUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.junit.Test;
@@ -40,14 +41,14 @@
@Test
@SuppressWarnings("unchecked")
- public void recentChangesWithLotsOfChangesHasContinue() throws
RetryableException {
+ public void recentChangesWithLotsOfChangesHasContinue() throws
RetryableException, ParseException {
/*
* This relies on there being lots of changes in the past 30 days.
Which
* is probably ok.
*/
int batchSize = randomIntBetween(3, 30);
JSONObject changes = repo.fetchRecentChanges(new
Date(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(30)),
- null, batchSize);
+ batchSize);
Map<String, Object> c = changes;
assertThat(c, hasKey("continue"));
assertThat((Map<String, Object>) changes.get("continue"),
hasKey("rccontinue"));
@@ -63,7 +64,8 @@
assertThat(rc, hasEntry(equalTo("timestamp"),
instanceOf(String.class)));
assertThat(rc, hasEntry(equalTo("revid"), instanceOf(Long.class)));
}
- changes = repo.fetchRecentChanges(null /* ignored */, (JSONObject)
changes.get("continue"), batchSize);
+ final Date nextDate = repo.getChangeFromContinue((Map<String,
Object>)changes.get("continue")).timestamp();
+ changes = repo.fetchRecentChanges(nextDate, batchSize);
assertThat(c, hasKey("query"));
assertThat((Map<String, Object>) c.get("query"),
hasKey("recentchanges"));
}
@@ -73,9 +75,9 @@
public void recentChangesWithFewChangesHasNoContinue() throws
RetryableException {
/*
* This relies on there being very few changes in the current
- * millisecond.
+ * second.
*/
- JSONObject changes = repo.fetchRecentChanges(new
Date(System.currentTimeMillis()), null, 500);
+ JSONObject changes = repo.fetchRecentChanges(new
Date(System.currentTimeMillis()), 500);
Map<String, Object> c = changes;
assertThat(c, not(hasKey("continue")));
assertThat(c, hasKey("query"));
@@ -92,7 +94,7 @@
editShowsUpInRecentChangesTestCase("QueryTestProperty", "property");
}
- private JSONArray getRecentChanges(Date date, JSONObject contObj, int
batchSize) throws RetryableException,
+ private JSONArray getRecentChanges(Date date, int batchSize) throws
RetryableException,
ContainedException {
// Add a bit of a wait to try and improve Jenkins test stability.
try {
@@ -100,7 +102,7 @@
} catch (InterruptedException e) {
// nothing to do here, sorry. I know it looks bad.
}
- JSONObject result = repo.fetchRecentChanges(date, contObj, batchSize);
+ JSONObject result = repo.fetchRecentChanges(date, batchSize);
return (JSONArray) ((JSONObject)
result.get("query")).get("recentchanges");
}
@@ -110,7 +112,7 @@
long now = System.currentTimeMillis();
String entityId = repo.firstEntityIdForLabelStartingWith(label, "en",
type);
repo.setLabel(entityId, type, label + now, "en");
- JSONArray changes = getRecentChanges(new Date(now - 10000), null, 10);
+ JSONArray changes = getRecentChanges(new Date(now - 10000), 10);
boolean found = false;
String title = entityId;
if (type.equals("property")) {
@@ -164,11 +166,11 @@
}
@Test
- public void continueWorks() throws RetryableException, ContainedException,
ParseException {
+ public void continueWorks() throws RetryableException, ContainedException,
ParseException, InterruptedException {
long now = System.currentTimeMillis();
String entityId =
repo.firstEntityIdForLabelStartingWith("QueryTestItem", "en", "item");
repo.setLabel(entityId, "item", "QueryTestItem" + now, "en");
- JSONArray changes = getRecentChanges(new Date(now - 10000), null, 10);
+ JSONArray changes = getRecentChanges(new Date(now - 10000), 10);
Change change = null;
long oldRevid = 0;
long oldRcid = 0;
@@ -185,10 +187,11 @@
}
}
assertNotNull("Did not find the first edit", change);
- JSONObject continueObject = repo.getContinueObject(change);
+ // Ensure this change is in different second
+ Thread.sleep(1000);
// make new edit now
repo.setLabel(entityId, "item", "QueryTestItem" + now + "updated",
"en");
- changes = getRecentChanges(new Date(now - 10000), continueObject, 10);
+ changes = getRecentChanges(DateUtils.addSeconds(change.timestamp(),
1), 10);
// check that new result does not contain old edit but contains new
edit
boolean found = false;
for (Object changeObject : changes) {
@@ -206,7 +209,7 @@
@SuppressWarnings({ "unchecked", "rawtypes" })
public void recentChangesWithErrors() throws RetryableException,
ContainedException {
WikibaseRepository proxyRepo = new WikibaseRepository("http",
"localhost", 8812);
- JSONObject changes = proxyRepo.fetchRecentChanges(new
Date(System.currentTimeMillis()), null, 500);
+ JSONObject changes = proxyRepo.fetchRecentChanges(new
Date(System.currentTimeMillis()), 500);
Map<String, Object> c = changes;
assertThat(c, not(hasKey("continue")));
assertThat(c, hasKey("query"));
--
To view, visit https://gerrit.wikimedia.org/r/301907
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I477f0fcbed00dd10ed086cdb752385e2c1f94ca0
Gerrit-PatchSet: 1
Gerrit-Project: wikidata/query/rdf
Gerrit-Branch: master
Gerrit-Owner: Smalyshev <[email protected]>
Gerrit-Reviewer: Smalyshev <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits