nfsantos commented on code in PR #1042:
URL: https://github.com/apache/jackrabbit-oak/pull/1042#discussion_r1316874389


##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java:
##########
@@ -215,8 +190,90 @@ private void reportProgress(String id) {
         traversalLog.trace(id);
     }
 
-    private void downloadRange(DownloadRange range) throws 
InterruptedException, TimeoutException {
-        BsonDocument findQuery = range.getFindQuery();
+
+    private void downloadWithRetryOnConnectionErrors() throws 
InterruptedException, TimeoutException {
+        // If regex filtering is enabled, start by downloading the ancestors 
of the path used for filtering.
+        // That is, download "/", "/content", "/content/dam" for a base path 
of "/content/dam". These nodes will not be
+        // matched by the regex used in the Mongo query, which assumes a 
prefix of "???:/content/dam"
+        Bson childrenFilter = null;
+        String regexBasePath = getPathForRegexFiltering();
+        if (regexBasePath != null) {
+            // Regex path filtering is enabled
+            // Download the ancestors in a separate query. No retrials done on 
this query, as it will take only a few
+            // seconds and is done at the start of the job, so if it fails, 
the job can be retried without losing much work
+            LOG.info("Using regex filtering with path {}", regexBasePath);
+            Bson ancestorsQuery = ancestorsFilter(regexBasePath);
+            LOG.info("Downloading ancestors of {}", regexBasePath);
+            // Let Mongo decide which index to use for this query, it will 
return very few documents
+            FindIterable<BasicDBObject> mongoIterable = dbCollection
+                    .withReadPreference(readPreference)
+                    .find(ancestorsQuery);
+            download(mongoIterable);
+            // Filter to apply to the main query
+            childrenFilter = childrenFilter(regexBasePath);
+        }
+
+        Instant failuresStartTimestamp = null; // When the last series of 
failures started
+        long retryIntervalMs = retryInitialIntervalMillis;
+        int numberOfFailures = 0;
+        boolean downloadCompleted = false;
+        Map<String, Integer> exceptions = new HashMap<>();
+        this.nextLastModified = 0;
+        this.lastIdDownloaded = null;
+        while (!downloadCompleted) {
+            try {
+                if (lastIdDownloaded != null) {
+                    LOG.info("Recovering from broken connection, finishing 
downloading documents with _modified={}", nextLastModified);
+                    downloadRange(new DownloadRange(nextLastModified, 
nextLastModified + 1, lastIdDownloaded), childrenFilter);
+                    // We have managed to reconnect, reset the failure 
timestamp
+                    failuresStartTimestamp = null;
+                    numberOfFailures = 0;
+                    // Continue downloading everything starting from the next 
_lastmodified value
+                    downloadRange(new DownloadRange(nextLastModified + 1, 
Long.MAX_VALUE, null), childrenFilter);
+                } else {
+                    downloadRange(new DownloadRange(nextLastModified, 
Long.MAX_VALUE, null), childrenFilter);
+                }
+                downloadCompleted = true;
+            } catch (MongoException e) {
+                if (e instanceof MongoInterruptedException || e instanceof 
MongoIncompatibleDriverException) {
+                    // Non-recoverable exceptions
+                    throw e;
+                }
+                if (failuresStartTimestamp == null) {
+                    failuresStartTimestamp = 
Instant.now().truncatedTo(ChronoUnit.SECONDS);
+                }
+                LOG.warn("Connection error downloading from MongoDB.", e);
+                long secondsSinceStartOfFailures = 
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
+                if (secondsSinceStartOfFailures > retryDuringSeconds) {
+                    // Give up. Get a string of all exceptions that were thrown
+                    StringBuilder summary = new StringBuilder();
+                    for (Map.Entry<String, Integer> entry : 
exceptions.entrySet()) {
+                        
summary.append("\n\t").append(entry.getValue()).append("x: 
").append(entry.getKey());
+                    }
+                    throw new RetryException(retryDuringSeconds, 
summary.toString(), e);
+                } else {
+                    numberOfFailures++;
+                    LOG.warn("Retrying download in {} ms; number of times 
failed: {}; current series of failures started at: {} ({} seconds ago)",
+                            retryIntervalMs, numberOfFailures, 
failuresStartTimestamp, secondsSinceStartOfFailures);
+                    exceptions.compute(e.getClass().getSimpleName() + " - " + 
e.getMessage(),
+                            (key, val) -> val == null ? 1 : val + 1
+                    );
+                    try {
+                        Thread.sleep(retryIntervalMs);
+                    } catch (InterruptedException ignore) {
+                    }

Review Comment:
   Yes, you are right. The method with this code already declares throwing 
`InterruptedException` and the caller is handling that exception explicitly, so 
the catch clause is not needed here. I removed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to