This is an automated email from the ASF dual-hosted git repository. nfsantos pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push: new 1a250e06d8 OAK-10825 - Indexing job: log [TASK:<PHASE>:FAIL] log message for when an indexing phase fails. (#1481) 1a250e06d8 is described below commit 1a250e06d890c16f45590a85da56cb0e14623260 Author: Nuno Santos <nsan...@adobe.com> AuthorDate: Tue May 28 10:01:38 2024 +0200 OAK-10825 - Indexing job: log [TASK:<PHASE>:FAIL] log message for when an indexing phase fails. (#1481) --- .../oak/plugins/index/MetricsFormatter.java | 22 +++- .../oak/plugins/index/importer/IndexImporter.java | 59 +++++---- .../plugins/index/importer/IndexImporterTest.java | 4 +- .../indexer/document/DocumentStoreIndexerBase.java | 138 +++++++++++---------- .../flatfile/pipelined/PipelinedMergeSortTask.java | 19 +-- .../pipelined/PipelinedMongoDownloadTask.java | 33 +++-- .../flatfile/pipelined/PipelinedSortBatchTask.java | 9 +- .../flatfile/pipelined/PipelinedStrategy.java | 3 + .../flatfile/pipelined/PipelinedTransformTask.java | 7 +- 9 files changed, 168 insertions(+), 126 deletions(-) diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java index f32e5f4102..f2a2021fa5 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java @@ -17,15 +17,31 @@ package org.apache.jackrabbit.oak.plugins.index; import org.apache.jackrabbit.guava.common.base.Preconditions; +import org.apache.jackrabbit.guava.common.base.Stopwatch; import org.apache.jackrabbit.oak.commons.json.JsopBuilder; +import java.util.concurrent.TimeUnit; + public class MetricsFormatter { - private final JsopBuilder jsopBuilder = new JsopBuilder(); - private boolean isWritable = true; + + public static String createMetricsWithDurationOnly(Stopwatch taskStartWatch) { + return createMetricsWithDurationOnly(taskStartWatch.elapsed(TimeUnit.SECONDS)); + } + + public static String createMetricsWithDurationOnly(long totalDurationSeconds) { + return MetricsFormatter.newBuilder() + .add("duration", FormattingUtils.formatToSeconds(totalDurationSeconds)) + .add("durationSeconds", totalDurationSeconds) + .build(); + } + public static MetricsFormatter newBuilder() { return new MetricsFormatter(); } + private final JsopBuilder jsopBuilder = new JsopBuilder(); + private boolean isWritable = true; + private MetricsFormatter() { jsopBuilder.object(); } @@ -55,7 +71,7 @@ public class MetricsFormatter { } public String build() { - if (isWritable){ + if (isWritable) { jsopBuilder.endObject(); isWritable = false; } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java index 6a1a1f92e7..9649e85692 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java @@ -486,35 +486,40 @@ public class IndexImporter { String indexImportPhaseName = indexImportState == null ? "null" : indexImportState.toString(); int count = 1; Stopwatch start = Stopwatch.createStarted(); - while (count <= maxRetries) { - LOG.info("IndexImporterStepExecutor:{}, count:{}", indexImportPhaseName, count); - LOG.info("[TASK:{}:START]", indexImportPhaseName); - try { - step.execute(); - long durationSeconds = start.elapsed(TimeUnit.SECONDS); - LOG.info("[TASK:{}:END] Metrics: {}", indexImportPhaseName, - MetricsFormatter.newBuilder() - .add("duration", FormattingUtils.formatToSeconds(durationSeconds)) - .add("durationSeconds", durationSeconds) - .build() - ); - - MetricsUtils.setCounterOnce(statisticsProvider, - "oak_indexer_import_" + indexImportPhaseName.toLowerCase() + "_duration_seconds", - durationSeconds); - indexingReporter.addTiming("oak_indexer_import_" + indexImportPhaseName.toLowerCase(), - FormattingUtils.formatToSeconds(durationSeconds)); - indexingReporter.addMetric("oak_indexer_import_" + indexImportPhaseName.toLowerCase() + "_duration_seconds", - durationSeconds); - - break; - } catch (CommitFailedException | IOException e) { - LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries left: {}", indexImportState, count, maxRetries - count, e); - if (count++ >= maxRetries) { - LOG.warn("IndexImporterStepExecutor:{} failed after {} retries", indexImportState, maxRetries, e); - throw e; + LOG.info("[TASK:{}:START]", indexImportPhaseName); + try { + while (count <= maxRetries) { + LOG.info("IndexImporterStepExecutor:{}, count:{}", indexImportPhaseName, count); + try { + step.execute(); + long durationSeconds = start.elapsed(TimeUnit.SECONDS); + LOG.info("[TASK:{}:END] Metrics: {}", + indexImportPhaseName, + MetricsFormatter.createMetricsWithDurationOnly(durationSeconds) + ); + MetricsUtils.setCounterOnce(statisticsProvider, + "oak_indexer_import_" + indexImportPhaseName.toLowerCase() + "_duration_seconds", + durationSeconds); + indexingReporter.addTiming("oak_indexer_import_" + indexImportPhaseName.toLowerCase(), + FormattingUtils.formatToSeconds(durationSeconds)); + indexingReporter.addMetric("oak_indexer_import_" + indexImportPhaseName.toLowerCase() + "_duration_seconds", + durationSeconds); + + break; + } catch (CommitFailedException | IOException e) { + LOG.warn("IndexImporterStepExecutor: {} fail count: {}, retries left: {}", indexImportState, count, maxRetries - count, e); + if (count++ >= maxRetries) { + LOG.warn("IndexImporterStepExecutor: {} failed after {} retries", indexImportState, maxRetries, e); + throw e; + } } } + } catch (Throwable t) { + LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", + indexImportPhaseName, + MetricsFormatter.createMetricsWithDurationOnly(start), + t.toString()); + throw t; } } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java index 2f5aaf564c..c3abb429a1 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java @@ -548,7 +548,7 @@ public class IndexImporterTest { PropUtils.writeTo(p, indexMeta, "index info"); } - private void dumpIndexDefinitions(String... indexPaths) throws IOException, CommitFailedException { + private void dumpIndexDefinitions(String... indexPaths) throws IOException { IndexDefinitionPrinter printer = new IndexDefinitionPrinter(store, () -> asList(indexPaths)); printer.setFilter("{\"properties\":[\"*\", \"-:childOrder\"],\"nodes\":[\"*\", \"-:index-definition\"]}"); File file = new File(temporaryFolder.getRoot(), INDEX_DEFINITIONS_JSON); @@ -639,7 +639,7 @@ public class IndexImporterTest { store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY)); // Test retry logic - String failureLog = MessageFormat.format("IndexImporterStepExecutor:{0} failed after {1} retries", + String failureLog = MessageFormat.format("IndexImporterStepExecutor: {0} failed after {1} retries", IndexImporter.IndexImportState.SWITCH_LANE, IndexImporter.RETRIES); boolean failureLogPresent = false; for (String log : customizer.getLogs()) { diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java index a123200047..e65b76ace0 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java @@ -315,81 +315,89 @@ public abstract class DocumentStoreIndexerBase implements Closeable { public void reindex() throws CommitFailedException, IOException { log.info("[TASK:FULL_INDEX_CREATION:START] Starting indexing job"); Stopwatch indexJobWatch = Stopwatch.createStarted(); - IndexingProgressReporter progressReporter = - new IndexingProgressReporter(IndexUpdateCallback.NOOP, NodeTraversalCallback.NOOP); - configureEstimators(progressReporter); + try { + IndexingProgressReporter progressReporter = + new IndexingProgressReporter(IndexUpdateCallback.NOOP, NodeTraversalCallback.NOOP); + configureEstimators(progressReporter); + + NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint(); + NodeStore copyOnWriteStore = new MemoryNodeStore(checkpointedState); + indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore); + NodeBuilder builder = copyOnWriteStore.getRoot().builder(); + CompositeIndexer indexer = prepareIndexers(copyOnWriteStore, builder, progressReporter); + if (indexer.isEmpty()) { + return; + } - NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint(); - NodeStore copyOnWriteStore = new MemoryNodeStore(checkpointedState); - indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore); - NodeBuilder builder = copyOnWriteStore.getRoot().builder(); - CompositeIndexer indexer = prepareIndexers(copyOnWriteStore, builder, progressReporter); - if (indexer.isEmpty()) { - return; - } + closer.register(indexer); + + List<FlatFileStore> flatFileStores = buildFlatFileStoreList( + checkpointedState, + indexer, + indexer::shouldInclude, + null, + IndexerConfiguration.parallelIndexEnabled(), + indexerSupport.getIndexDefinitions(), + indexingReporter); - closer.register(indexer); + progressReporter.reset(); - List<FlatFileStore> flatFileStores = buildFlatFileStoreList( - checkpointedState, - indexer, - indexer::shouldInclude, - null, - IndexerConfiguration.parallelIndexEnabled(), - indexerSupport.getIndexDefinitions(), - indexingReporter); + progressReporter.reindexingTraversalStart("/"); - progressReporter.reset(); + preIndexOperations(indexer.getIndexers()); - progressReporter.reindexingTraversalStart("/"); + log.info("[TASK:INDEXING:START] Starting indexing"); + Stopwatch indexerWatch = Stopwatch.createStarted(); + try { - preIndexOperations(indexer.getIndexers()); + if (flatFileStores.size() > 1) { + indexParallel(flatFileStores, indexer, progressReporter); + } else if (flatFileStores.size() == 1) { + FlatFileStore flatFileStore = flatFileStores.get(0); + for (NodeStateEntry entry : flatFileStore) { + reportDocumentRead(entry.getPath(), progressReporter); + indexer.index(entry); + } + } - log.info("[TASK:INDEXING:START] Starting indexing"); - Stopwatch indexerWatch = Stopwatch.createStarted(); + progressReporter.reindexingTraversalEnd(); + progressReporter.logReport(); + long indexingDurationSeconds = indexerWatch.elapsed(TimeUnit.SECONDS); + log.info("Completed indexing in {}", FormattingUtils.formatToSeconds(indexingDurationSeconds)); + log.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.createMetricsWithDurationOnly(indexingDurationSeconds)); + MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_DURATION_SECONDS, indexingDurationSeconds); + indexingReporter.addTiming("Build Lucene Index", FormattingUtils.formatToSeconds(indexingDurationSeconds)); + } catch (Throwable t) { + log.info("[TASK:INDEXING:FAIL] Metrics: {}, Error: {}", + MetricsFormatter.createMetricsWithDurationOnly(indexerWatch), t.toString()); + throw t; + } - if (flatFileStores.size() > 1) { - indexParallel(flatFileStores, indexer, progressReporter); - } else if (flatFileStores.size() == 1) { - FlatFileStore flatFileStore = flatFileStores.get(0); - for (NodeStateEntry entry : flatFileStore) { - reportDocumentRead(entry.getPath(), progressReporter); - indexer.index(entry); + log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store"); + Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted(); + try { + copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + long mergeNodeStoreDurationSeconds = mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS); + log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreDurationSeconds)); + MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds); + indexingReporter.addTiming("Merge node store", FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds)); + } catch (Throwable t) { + log.info("[TASK:MERGE_NODE_STORE:FAIL] Metrics: {}, Error: {}", + MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreWatch), t.toString()); + throw t; } - } - progressReporter.reindexingTraversalEnd(); - progressReporter.logReport(); - long indexingDurationSeconds = indexerWatch.elapsed(TimeUnit.SECONDS); - log.info("Completed the indexing in {}", FormattingUtils.formatToSeconds(indexingDurationSeconds)); - log.info("[TASK:INDEXING:END] Metrics: {}", MetricsFormatter.newBuilder() - .add("duration", FormattingUtils.formatToSeconds(indexingDurationSeconds)) - .add("durationSeconds", indexingDurationSeconds) - .build()); - MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_INDEXING_DURATION_SECONDS, indexingDurationSeconds); - indexingReporter.addTiming("Build Lucene Index", FormattingUtils.formatToSeconds(indexingDurationSeconds)); - - log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store"); - Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted(); - copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - long mergeNodeStoreDurationSeconds = mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS); - log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", MetricsFormatter.newBuilder() - .add("duration", FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds)) - .add("durationSeconds", mergeNodeStoreDurationSeconds) - .build()); - MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds); - indexingReporter.addTiming("Merge node store", FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds)); - - indexerSupport.postIndexWork(copyOnWriteStore); - - long fullIndexCreationDurationSeconds = indexJobWatch.elapsed(TimeUnit.SECONDS); - log.info("[TASK:FULL_INDEX_CREATION:END] Metrics {}", MetricsFormatter.newBuilder() - .add("duration", FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds)) - .add("durationSeconds", fullIndexCreationDurationSeconds) - .build()); - - MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_FULL_INDEX_CREATION_DURATION_SECONDS, fullIndexCreationDurationSeconds); - indexingReporter.addTiming("Total time", FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds)); + indexerSupport.postIndexWork(copyOnWriteStore); + + long fullIndexCreationDurationSeconds = indexJobWatch.elapsed(TimeUnit.SECONDS); + log.info("[TASK:FULL_INDEX_CREATION:END] Metrics {}", MetricsFormatter.createMetricsWithDurationOnly(fullIndexCreationDurationSeconds)); + MetricsUtils.addMetric(statisticsProvider, indexingReporter, METRIC_FULL_INDEX_CREATION_DURATION_SECONDS, fullIndexCreationDurationSeconds); + indexingReporter.addTiming("Total time", FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds)); + } catch (Throwable t) { + log.info("[TASK:FULL_INDEX_CREATION:FAIL] Metrics: {}, Error: {}", + MetricsFormatter.createMetricsWithDurationOnly(indexJobWatch), t.toString()); + throw t; + } } private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer indexer, IndexingProgressReporter progressReporter) diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java index 6681399f63..bc3b352e27 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java @@ -210,23 +210,24 @@ public class PipelinedMergeSortTask implements Callable<PipelinedMergeSortTask.R String originalName = Thread.currentThread().getName(); Thread.currentThread().setName(THREAD_NAME); int intermediateFilesCount = 0; + LOG.info("[TASK:{}:START] Starting merge sort task", THREAD_NAME.toUpperCase(Locale.ROOT)); + Stopwatch finalMergeWatch = Stopwatch.createUnstarted(); try { - LOG.info("[TASK:{}:START] Starting merge sort task", THREAD_NAME.toUpperCase(Locale.ROOT)); while (true) { LOG.debug("Waiting for next intermediate sorted file"); Path sortedIntermediateFile = sortedFilesQueue.take(); if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) { long sortedFilesSizeBytes = sizeOf(sortedFiles); LOG.info("Going to sort {} files, total size {}", sortedFiles.size(), IOUtils.humanReadableByteCountBin(sortedFilesSizeBytes)); - Stopwatch w = Stopwatch.createStarted(); + finalMergeWatch.start(); List<Path> simpleFileList = sortedFiles.stream().map(f -> f.file).collect(Collectors.toList()); Path flatFileStore = sortStoreFile(simpleFileList); - LOG.info("Final merge completed in {}. Created file: {}", FormattingUtils.formatToSeconds(w), flatFileStore.toAbsolutePath()); + LOG.info("Final merge completed in {}. Created file: {}", FormattingUtils.formatToSeconds(finalMergeWatch), flatFileStore.toAbsolutePath()); long ffsSizeBytes = Files.size(flatFileStore); - long durationSeconds = w.elapsed(TimeUnit.SECONDS); + long durationSeconds = finalMergeWatch.elapsed(TimeUnit.SECONDS); String metrics = MetricsFormatter.newBuilder() - .add("duration", FormattingUtils.formatToSeconds(w)) + .add("duration", FormattingUtils.formatToSeconds(finalMergeWatch)) .add("durationSeconds", durationSeconds) .add("intermediateFilesCount", intermediateFilesCount) .add("eagerMergesRuns", eagerMergeRuns) @@ -236,7 +237,7 @@ public class PipelinedMergeSortTask implements Callable<PipelinedMergeSortTask.R .build(); LOG.info("[TASK:{}:END] Metrics: {}", THREAD_NAME.toUpperCase(Locale.ROOT), metrics); - reporter.addTiming("Merge sort", FormattingUtils.formatToSeconds(w)); + reporter.addTiming("Merge sort", FormattingUtils.formatToSeconds(finalMergeWatch)); MetricsUtils.addMetric(statisticsProvider, reporter, PipelinedMetrics.OAK_INDEXER_PIPELINED_MERGE_SORT_FINAL_MERGE_DURATION_SECONDS, durationSeconds); MetricsUtils.addMetric(statisticsProvider, reporter, PipelinedMetrics.OAK_INDEXER_PIPELINED_MERGE_SORT_INTERMEDIATE_FILES_TOTAL, intermediateFilesCount); MetricsUtils.addMetric(statisticsProvider, reporter, PipelinedMetrics.OAK_INDEXER_PIPELINED_MERGE_SORT_EAGER_MERGES_RUNS_TOTAL, eagerMergeRuns); @@ -259,10 +260,10 @@ public class PipelinedMergeSortTask implements Callable<PipelinedMergeSortTask.R } } } - } catch (InterruptedException t) { - LOG.warn("Thread interrupted", t); - throw t; } catch (Throwable t) { + LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", THREAD_NAME.toUpperCase(Locale.ROOT), + MetricsFormatter.createMetricsWithDurationOnly(finalMergeWatch), + t.toString()); LOG.warn("Thread terminating with exception", t); throw t; } finally { diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java index 5b8568fd6c..73638210cf 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java @@ -46,6 +46,7 @@ import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper; import org.apache.jackrabbit.oak.plugins.index.FormattingUtils; import org.apache.jackrabbit.oak.plugins.index.IndexingReporter; +import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter; import org.apache.jackrabbit.oak.spi.filter.PathFilter; import org.apache.jackrabbit.oak.stats.StatisticsProvider; import org.bson.BsonDocument; @@ -325,19 +326,27 @@ public class PipelinedMongoDownloadTask implements Callable<PipelinedMongoDownlo .getCollection(Collection.NODES.toString(), NodeDocument.class); LOG.info("[TASK:{}:START] Starting to download from MongoDB", Thread.currentThread().getName().toUpperCase(Locale.ROOT)); - downloadStartWatch.start(); - if (retryOnConnectionErrors) { - downloadWithRetryOnConnectionErrors(); - } else { - downloadWithNaturalOrdering(); + try { + downloadStartWatch.start(); + if (retryOnConnectionErrors) { + downloadWithRetryOnConnectionErrors(); + } else { + downloadWithNaturalOrdering(); + } + downloadStartWatch.stop(); + long durationMillis = downloadStartWatch.elapsed(TimeUnit.MILLISECONDS); + downloadStageStatistics.publishStatistics(statisticsProvider, reporter, durationMillis); + String metrics = downloadStageStatistics.formatStats(durationMillis); + LOG.info("[TASK:{}:END] Metrics: {}", Thread.currentThread().getName().toUpperCase(Locale.ROOT), metrics); + reporter.addTiming("Mongo dump", FormattingUtils.formatToSeconds(downloadStartWatch)); + return new PipelinedMongoDownloadTask.Result(downloadStageStatistics.getDocumentsDownloadedTotal()); + } catch (Throwable t) { + LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", + Thread.currentThread().getName().toUpperCase(Locale.ROOT), + MetricsFormatter.createMetricsWithDurationOnly(downloadStartWatch), + t.toString()); + throw t; } - downloadStartWatch.stop(); - long durationMillis = downloadStartWatch.elapsed(TimeUnit.MILLISECONDS); - downloadStageStatistics.publishStatistics(statisticsProvider, reporter, durationMillis); - String metrics = downloadStageStatistics.formatStats(durationMillis); - LOG.info("[TASK:{}:END] Metrics: {}", Thread.currentThread().getName().toUpperCase(Locale.ROOT), metrics); - reporter.addTiming("Mongo dump", FormattingUtils.formatToSeconds(downloadStartWatch)); - return new PipelinedMongoDownloadTask.Result(downloadStageStatistics.getDocumentsDownloadedTotal()); } } finally { Thread.currentThread().setName(originalName); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java index 3994b3285b..59a1346a56 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java @@ -103,8 +103,8 @@ class PipelinedSortBatchTask implements Callable<PipelinedSortBatchTask.Result> Stopwatch taskStartTime = Stopwatch.createStarted(); String originalName = Thread.currentThread().getName(); Thread.currentThread().setName(THREAD_NAME); + LOG.info("[TASK:{}:START] Starting sort-and-save task", THREAD_NAME.toUpperCase(Locale.ROOT)); try { - LOG.info("[TASK:{}:START] Starting sort-and-save task", THREAD_NAME.toUpperCase(Locale.ROOT)); while (true) { LOG.info("Waiting for next batch"); NodeStateEntryBatch nseBuffer = nonEmptyBuffersQueue.take(); @@ -142,10 +142,11 @@ class PipelinedSortBatchTask implements Callable<PipelinedSortBatchTask.Result> nseBuffer.reset(); emptyBuffersQueue.put(nseBuffer); } - } catch (InterruptedException t) { - LOG.warn("Thread interrupted", t); - throw t; } catch (Throwable t) { + LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", + THREAD_NAME.toUpperCase(Locale.ROOT), + MetricsFormatter.createMetricsWithDurationOnly(taskStartTime), + t.toString()); LOG.warn("Thread terminating with exception", t); throw t; } finally { diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java index 195922f12b..e54917fd52 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java @@ -516,6 +516,9 @@ public class PipelinedStrategy extends IndexStoreSortStrategyBase { LOG.info("[INDEXING_REPORT:BUILD_FFS]\n{}", indexingReporter.generateReport()); } catch (Throwable e) { + LOG.info("[TASK:PIPELINED-DUMP:FAIL] Metrics: {}, Error: {}", + MetricsFormatter.createMetricsWithDurationOnly(start), e.toString() + ); LOG.warn("Error dumping from MongoDB. Cancelling all tasks. Error: {}", e.toString()); // Cancel in order cancelFuture(downloadFuture); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java index 62fea7ac46..2672c8dc2d 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java @@ -115,10 +115,10 @@ class PipelinedTransformTask implements Callable<PipelinedTransformTask.Result> String originalName = Thread.currentThread().getName(); String threadName = THREAD_NAME_PREFIX + threadId; Thread.currentThread().setName(threadName); + Stopwatch taskStartWatch = Stopwatch.createStarted(); try { LOG.info("[TASK:{}:START] Starting transform task", threadName.toUpperCase(Locale.ROOT)); NodeDocumentCache nodeCache = MongoDocumentStoreHelper.getNodeDocumentCache(mongoStore); - Stopwatch taskStartWatch = Stopwatch.createStarted(); long totalDocumentQueueWaitTimeMillis = 0; long totalEntryCount = 0; long mongoObjectsProcessed = 0; @@ -215,10 +215,9 @@ class PipelinedTransformTask implements Callable<PipelinedTransformTask.Result> } } } - } catch (InterruptedException t) { - LOG.warn("Thread interrupted", t); - throw t; } catch (Throwable t) { + LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", + threadName.toUpperCase(Locale.ROOT), MetricsFormatter.createMetricsWithDurationOnly(taskStartWatch), t.toString()); LOG.warn("Thread terminating with exception", t); throw t; } finally {