nfsantos commented on code in PR #1435:
URL: https://github.com/apache/jackrabbit-oak/pull/1435#discussion_r1596848481
##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java:
##########
@@ -581,94 +498,295 @@ private MongoFilterPaths getPathsForRegexFiltering() {
}
}
- private void download(FindIterable<NodeDocument> mongoIterable) throws
InterruptedException, TimeoutException {
- try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) {
- NodeDocument[] batch = new NodeDocument[maxBatchNumberOfDocuments];
- int nextIndex = 0;
- int batchSize = 0;
- try {
- while (cursor.hasNext()) {
- NodeDocument next = cursor.next();
- String id = next.getId();
- // If we are retrying on connection errors, we need to
keep track of the last _modified value
- if (retryOnConnectionErrors) {
- this.nextLastModified = next.getModified();
+ private void logWithRateLimit(Runnable f) {
+ Instant now = Instant.now();
+ if (Duration.between(lastDelayedEnqueueWarningMessageLoggedTimestamp,
now).toSeconds() > MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES) {
+ f.run();
+ lastDelayedEnqueueWarningMessageLoggedTimestamp = now;
+ }
+ }
+
+ enum DownloadOrder {
+ ASCENDING,
+ DESCENDING,
+ UNDEFINED;
+
+ public boolean downloadInAscendingOrder() {
+ return this == ASCENDING || this == UNDEFINED;
+ }
+ }
+
+ /**
+ * Downloads a given range from Mongo. Instances of this class should be
used for downloading a single range.
+ * To download multiple ranges, create multiple instances of this class.
+ */
+ private class DownloadTask {
+ private final DownloadOrder downloadOrder;
+ private final DownloadStageStatistics downloadStatics;
+ private long documentsDownloadedTotalBytes;
+ private long documentsDownloadedTotal;
+ private long totalEnqueueWaitTimeMillis;
+ private long nextLastModified;
+ private String lastIdDownloaded;
+
+ DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics
downloadStatics) {
+ this.downloadOrder = downloadOrder;
+ this.downloadStatics = downloadStatics;
+ this.documentsDownloadedTotalBytes = 0;
+ this.documentsDownloadedTotal = 0;
+ this.totalEnqueueWaitTimeMillis = 0;
+ this.nextLastModified = downloadOrder.downloadInAscendingOrder() ?
0 : Long.MAX_VALUE;
+ this.lastIdDownloaded = null;
+ }
+
+ private Instant failuresStartTimestamp = null; // When the last series
of failures started
+ private int numberOfFailures = 0;
+
+ public long getDocumentsDownloadedTotalBytes() {
+ return documentsDownloadedTotalBytes;
+ }
+
+ public long getDocumentsDownloadedTotal() {
+ return documentsDownloadedTotal;
+ }
+
+ private void download(Bson mongoQueryFilter) throws
InterruptedException, TimeoutException {
+ failuresStartTimestamp = null; // When the last series of failures
started
+ numberOfFailures = 0;
+ long retryIntervalMs = retryInitialIntervalMillis;
+ boolean downloadCompleted = false;
+ Map<String, Integer> exceptions = new HashMap<>();
+ while (!downloadCompleted) {
+ try {
+ if (lastIdDownloaded == null) {
+ // lastIdDownloaded is null only when starting the
download or if there is a connection error
+ // before anything is downloaded
+ DownloadRange firstRange = new DownloadRange(0,
Long.MAX_VALUE, null, downloadOrder.downloadInAscendingOrder());
+ downloadRange(firstRange, mongoQueryFilter,
downloadOrder);
+ } else {
+ LOG.info("Recovering from broken connection, finishing
downloading documents with _modified={}", nextLastModified);
+ DownloadRange partialLastModifiedRange = new
DownloadRange(nextLastModified, nextLastModified, lastIdDownloaded,
downloadOrder.downloadInAscendingOrder());
+ downloadRange(partialLastModifiedRange,
mongoQueryFilter, downloadOrder);
+ // Downloaded everything from _nextLastModified.
Continue with the next timestamp for _modified
+ DownloadRange nextRange =
downloadOrder.downloadInAscendingOrder() ?
+ new DownloadRange(nextLastModified + 1,
Long.MAX_VALUE, null, true) :
+ new DownloadRange(0, nextLastModified - 1,
null, false);
+ downloadRange(nextRange, mongoQueryFilter,
downloadOrder);
}
- this.lastIdDownloaded = id;
- this.documentsDownloadedTotal++;
- reportProgress(id);
-
- batch[nextIndex] = next;
- nextIndex++;
- int docSize = (int)
next.remove(NodeDocumentCodec.SIZE_FIELD);
- batchSize += docSize;
- documentsDownloadedTotalBytes += docSize;
- if (batchSize >= maxBatchSizeBytes || nextIndex ==
batch.length) {
- LOG.trace("Enqueuing block with {} elements, estimated
size: {} bytes", nextIndex, batchSize);
- tryEnqueueCopy(batch, nextIndex);
- nextIndex = 0;
- batchSize = 0;
+ downloadCompleted = true;
+ } catch (MongoException e) {
+ if (e instanceof MongoInterruptedException || e instanceof
MongoIncompatibleDriverException) {
+ // Non-recoverable exceptions
+ throw e;
+ }
+ if (failuresStartTimestamp == null) {
+ failuresStartTimestamp =
Instant.now().truncatedTo(ChronoUnit.SECONDS);
+ }
+ LOG.warn("Connection error downloading from MongoDB.", e);
+ long secondsSinceStartOfFailures =
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
+ if (parallelDump &&
mongoServerSelector.atLeastOneConnectionActive()) {
+ // Special case, the cluster is up because one of the
connections is active. This happens when
+ // there is a single secondary, maybe because of a
scale-up/down. In this case, do not abort the
+ // download, keep retrying to connect forever
+ int retryTime = 1000;
+ LOG.info("At least one connection is active. Retrying
download in {} ms", retryTime);
+ Thread.sleep(retryTime);
+ } else if (secondsSinceStartOfFailures >
retryDuringSeconds) {
+ // Give up. Get a string of all exceptions that were
thrown
+ StringBuilder summary = new StringBuilder();
+ for (Map.Entry<String, Integer> entry :
exceptions.entrySet()) {
+
summary.append("\n\t").append(entry.getValue()).append("x:
").append(entry.getKey());
+ }
+ throw new RetryException(retryDuringSeconds,
summary.toString(), e);
+ } else {
+ numberOfFailures++;
+ LOG.warn("Retrying download in {} ms; number of times
failed: {}; current series of failures started at: {} ({} seconds ago)",
+ retryIntervalMs, numberOfFailures,
failuresStartTimestamp, secondsSinceStartOfFailures);
+ exceptions.compute(e.getClass().getSimpleName() + " -
" + e.getMessage(),
+ (key, val) -> val == null ? 1 : val + 1
+ );
+ Thread.sleep(retryIntervalMs);
+ // simple exponential backoff mechanism
+ retryIntervalMs = Math.min(retryMaxIntervalMillis,
retryIntervalMs * 2);
}
}
- if (nextIndex > 0) {
- LOG.info("Enqueueing last block with {} elements,
estimated size: {}",
- nextIndex,
IOUtils.humanReadableByteCountBin(batchSize));
- tryEnqueueCopy(batch, nextIndex);
+ }
+ }
+
+ private void downloadRange(DownloadRange range, Bson filter,
DownloadOrder downloadOrder) throws InterruptedException, TimeoutException {
+ Bson findQuery = range.getFindQuery();
+ if (filter != null) {
+ findQuery = Filters.and(findQuery, filter);
+ }
+ Bson sortOrder = downloadOrder.downloadInAscendingOrder() ?
+ Sorts.ascending(NodeDocument.MODIFIED_IN_SECS,
NodeDocument.ID) :
+ Sorts.descending(NodeDocument.MODIFIED_IN_SECS,
NodeDocument.ID);
+
+ FindIterable<NodeDocument> mongoIterable = dbCollection
+ .find(findQuery)
+ .sort(sortOrder);
+
+ LOG.info("Traversing: {}. Query: {}, Traverse order: {}", range,
findQuery, sortOrder);
+ download(mongoIterable);
+ }
+
+ void download(FindIterable<NodeDocument> mongoIterable) throws
InterruptedException, TimeoutException {
+ try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) {
+ NodeDocument[] batch = new
NodeDocument[maxBatchNumberOfDocuments];
+ int nextIndex = 0;
+ int batchSize = 0;
+ if (cursor.hasNext()) {
+ // We have managed to reconnect, reset the failure
timestamp
+ failuresStartTimestamp = null;
+ numberOfFailures = 0;
}
- } catch (MongoException e) {
- if (e instanceof MongoInterruptedException || e instanceof
MongoIncompatibleDriverException) {
- // Non-recoverable exceptions
+ try {
+ while (cursor.hasNext()) {
+ NodeDocument next = cursor.next();
+ String id = next.getId();
+ this.nextLastModified = next.getModified();
Review Comment:
Good catch, this would indeed be a problem when doing a column traversal
because the query used for that was downloading all documents, even those
without the `_modified` field. We do not need these documents to build the FFS,
so it is safe to filter them on the Mongo query. Like this, there is not need
to have a null check here. This code is on the critical path, so it should be
kept as lean as possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]