This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1a250e06d8 OAK-10825 - Indexing job: log [TASK:<PHASE>:FAIL] log 
message for when an indexing phase fails. (#1481)
1a250e06d8 is described below

commit 1a250e06d890c16f45590a85da56cb0e14623260
Author: Nuno Santos <nsan...@adobe.com>
AuthorDate: Tue May 28 10:01:38 2024 +0200

    OAK-10825 - Indexing job: log [TASK:<PHASE>:FAIL] log message for when an 
indexing phase fails. (#1481)
---
 .../oak/plugins/index/MetricsFormatter.java        |  22 +++-
 .../oak/plugins/index/importer/IndexImporter.java  |  59 +++++----
 .../plugins/index/importer/IndexImporterTest.java  |   4 +-
 .../indexer/document/DocumentStoreIndexerBase.java | 138 +++++++++++----------
 .../flatfile/pipelined/PipelinedMergeSortTask.java |  19 +--
 .../pipelined/PipelinedMongoDownloadTask.java      |  33 +++--
 .../flatfile/pipelined/PipelinedSortBatchTask.java |   9 +-
 .../flatfile/pipelined/PipelinedStrategy.java      |   3 +
 .../flatfile/pipelined/PipelinedTransformTask.java |   7 +-
 9 files changed, 168 insertions(+), 126 deletions(-)

diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
index f32e5f4102..f2a2021fa5 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/MetricsFormatter.java
@@ -17,15 +17,31 @@
 package org.apache.jackrabbit.oak.plugins.index;
 
 import org.apache.jackrabbit.guava.common.base.Preconditions;
+import org.apache.jackrabbit.guava.common.base.Stopwatch;
 import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
 
+import java.util.concurrent.TimeUnit;
+
 public class MetricsFormatter {
-    private final JsopBuilder jsopBuilder = new JsopBuilder();
-    private boolean isWritable = true;
+
+    public static String createMetricsWithDurationOnly(Stopwatch 
taskStartWatch) {
+        return 
createMetricsWithDurationOnly(taskStartWatch.elapsed(TimeUnit.SECONDS));
+    }
+
+    public static String createMetricsWithDurationOnly(long 
totalDurationSeconds) {
+        return MetricsFormatter.newBuilder()
+                .add("duration", 
FormattingUtils.formatToSeconds(totalDurationSeconds))
+                .add("durationSeconds", totalDurationSeconds)
+                .build();
+    }
+
     public static MetricsFormatter newBuilder() {
         return new MetricsFormatter();
     }
 
+    private final JsopBuilder jsopBuilder = new JsopBuilder();
+    private boolean isWritable = true;
+
     private MetricsFormatter() {
         jsopBuilder.object();
     }
@@ -55,7 +71,7 @@ public class MetricsFormatter {
     }
 
     public String build() {
-        if (isWritable){
+        if (isWritable) {
             jsopBuilder.endObject();
             isWritable = false;
         }
diff --git 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
index 6a1a1f92e7..9649e85692 100644
--- 
a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
+++ 
b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
@@ -486,35 +486,40 @@ public class IndexImporter {
         String indexImportPhaseName = indexImportState == null ? "null" : 
indexImportState.toString();
         int count = 1;
         Stopwatch start = Stopwatch.createStarted();
-        while (count <= maxRetries) {
-            LOG.info("IndexImporterStepExecutor:{}, count:{}", 
indexImportPhaseName, count);
-            LOG.info("[TASK:{}:START]", indexImportPhaseName);
-            try {
-                step.execute();
-                long durationSeconds = start.elapsed(TimeUnit.SECONDS);
-                LOG.info("[TASK:{}:END] Metrics: {}", indexImportPhaseName,
-                        MetricsFormatter.newBuilder()
-                                .add("duration", 
FormattingUtils.formatToSeconds(durationSeconds))
-                                .add("durationSeconds", durationSeconds)
-                                .build()
-                );
-
-                MetricsUtils.setCounterOnce(statisticsProvider,
-                        "oak_indexer_import_" + 
indexImportPhaseName.toLowerCase() + "_duration_seconds",
-                        durationSeconds);
-                indexingReporter.addTiming("oak_indexer_import_" + 
indexImportPhaseName.toLowerCase(),
-                        FormattingUtils.formatToSeconds(durationSeconds));
-                indexingReporter.addMetric("oak_indexer_import_" + 
indexImportPhaseName.toLowerCase() + "_duration_seconds",
-                        durationSeconds);
-
-                break;
-            } catch (CommitFailedException | IOException e) {
-                LOG.warn("IndexImporterStepExecutor:{} fail count: {}, retries 
left: {}", indexImportState, count, maxRetries - count, e);
-                if (count++ >= maxRetries) {
-                    LOG.warn("IndexImporterStepExecutor:{} failed after {} 
retries", indexImportState, maxRetries, e);
-                    throw e;
+        LOG.info("[TASK:{}:START]", indexImportPhaseName);
+        try {
+            while (count <= maxRetries) {
+                LOG.info("IndexImporterStepExecutor:{}, count:{}", 
indexImportPhaseName, count);
+                try {
+                    step.execute();
+                    long durationSeconds = start.elapsed(TimeUnit.SECONDS);
+                    LOG.info("[TASK:{}:END] Metrics: {}",
+                            indexImportPhaseName,
+                            
MetricsFormatter.createMetricsWithDurationOnly(durationSeconds)
+                    );
+                    MetricsUtils.setCounterOnce(statisticsProvider,
+                            "oak_indexer_import_" + 
indexImportPhaseName.toLowerCase() + "_duration_seconds",
+                            durationSeconds);
+                    indexingReporter.addTiming("oak_indexer_import_" + 
indexImportPhaseName.toLowerCase(),
+                            FormattingUtils.formatToSeconds(durationSeconds));
+                    indexingReporter.addMetric("oak_indexer_import_" + 
indexImportPhaseName.toLowerCase() + "_duration_seconds",
+                            durationSeconds);
+
+                    break;
+                } catch (CommitFailedException | IOException e) {
+                    LOG.warn("IndexImporterStepExecutor: {} fail count: {}, 
retries left: {}", indexImportState, count, maxRetries - count, e);
+                    if (count++ >= maxRetries) {
+                        LOG.warn("IndexImporterStepExecutor: {} failed after 
{} retries", indexImportState, maxRetries, e);
+                        throw e;
+                    }
                 }
             }
+        } catch (Throwable t) {
+            LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}",
+                    indexImportPhaseName,
+                    MetricsFormatter.createMetricsWithDurationOnly(start),
+                    t.toString());
+            throw t;
         }
     }
 
diff --git 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
index 2f5aaf564c..c3abb429a1 100644
--- 
a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
+++ 
b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
@@ -548,7 +548,7 @@ public class IndexImporterTest {
         PropUtils.writeTo(p, indexMeta, "index info");
     }
 
-    private void dumpIndexDefinitions(String... indexPaths) throws 
IOException, CommitFailedException {
+    private void dumpIndexDefinitions(String... indexPaths) throws IOException 
{
         IndexDefinitionPrinter printer = new IndexDefinitionPrinter(store, () 
-> asList(indexPaths));
         printer.setFilter("{\"properties\":[\"*\", 
\"-:childOrder\"],\"nodes\":[\"*\", \"-:index-definition\"]}");
         File file = new File(temporaryFolder.getRoot(), 
INDEX_DEFINITIONS_JSON);
@@ -639,7 +639,7 @@ public class IndexImporterTest {
                     
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.INDEX_IMPORT_STATE_KEY));
 
             // Test retry logic
-            String failureLog = 
MessageFormat.format("IndexImporterStepExecutor:{0} failed after {1} retries",
+            String failureLog = 
MessageFormat.format("IndexImporterStepExecutor: {0} failed after {1} retries",
                     IndexImporter.IndexImportState.SWITCH_LANE, 
IndexImporter.RETRIES);
             boolean failureLogPresent = false;
             for (String log : customizer.getLogs()) {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index a123200047..e65b76ace0 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -315,81 +315,89 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
     public void reindex() throws CommitFailedException, IOException {
         log.info("[TASK:FULL_INDEX_CREATION:START] Starting indexing job");
         Stopwatch indexJobWatch = Stopwatch.createStarted();
-        IndexingProgressReporter progressReporter =
-                new IndexingProgressReporter(IndexUpdateCallback.NOOP, 
NodeTraversalCallback.NOOP);
-        configureEstimators(progressReporter);
+        try {
+            IndexingProgressReporter progressReporter =
+                    new IndexingProgressReporter(IndexUpdateCallback.NOOP, 
NodeTraversalCallback.NOOP);
+            configureEstimators(progressReporter);
+
+            NodeState checkpointedState = 
indexerSupport.retrieveNodeStateForCheckpoint();
+            NodeStore copyOnWriteStore = new 
MemoryNodeStore(checkpointedState);
+            indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
+            NodeBuilder builder = copyOnWriteStore.getRoot().builder();
+            CompositeIndexer indexer = prepareIndexers(copyOnWriteStore, 
builder, progressReporter);
+            if (indexer.isEmpty()) {
+                return;
+            }
 
-        NodeState checkpointedState = 
indexerSupport.retrieveNodeStateForCheckpoint();
-        NodeStore copyOnWriteStore = new MemoryNodeStore(checkpointedState);
-        indexerSupport.switchIndexLanesAndReindexFlag(copyOnWriteStore);
-        NodeBuilder builder = copyOnWriteStore.getRoot().builder();
-        CompositeIndexer indexer = prepareIndexers(copyOnWriteStore, builder, 
progressReporter);
-        if (indexer.isEmpty()) {
-            return;
-        }
+            closer.register(indexer);
+
+            List<FlatFileStore> flatFileStores = buildFlatFileStoreList(
+                    checkpointedState,
+                    indexer,
+                    indexer::shouldInclude,
+                    null,
+                    IndexerConfiguration.parallelIndexEnabled(),
+                    indexerSupport.getIndexDefinitions(),
+                    indexingReporter);
 
-        closer.register(indexer);
+            progressReporter.reset();
 
-        List<FlatFileStore> flatFileStores = buildFlatFileStoreList(
-                checkpointedState,
-                indexer,
-                indexer::shouldInclude,
-                null,
-                IndexerConfiguration.parallelIndexEnabled(),
-                indexerSupport.getIndexDefinitions(),
-                indexingReporter);
+            progressReporter.reindexingTraversalStart("/");
 
-        progressReporter.reset();
+            preIndexOperations(indexer.getIndexers());
 
-        progressReporter.reindexingTraversalStart("/");
+            log.info("[TASK:INDEXING:START] Starting indexing");
+            Stopwatch indexerWatch = Stopwatch.createStarted();
+            try {
 
-        preIndexOperations(indexer.getIndexers());
+                if (flatFileStores.size() > 1) {
+                    indexParallel(flatFileStores, indexer, progressReporter);
+                } else if (flatFileStores.size() == 1) {
+                    FlatFileStore flatFileStore = flatFileStores.get(0);
+                    for (NodeStateEntry entry : flatFileStore) {
+                        reportDocumentRead(entry.getPath(), progressReporter);
+                        indexer.index(entry);
+                    }
+                }
 
-        log.info("[TASK:INDEXING:START] Starting indexing");
-        Stopwatch indexerWatch = Stopwatch.createStarted();
+                progressReporter.reindexingTraversalEnd();
+                progressReporter.logReport();
+                long indexingDurationSeconds = 
indexerWatch.elapsed(TimeUnit.SECONDS);
+                log.info("Completed indexing in {}", 
FormattingUtils.formatToSeconds(indexingDurationSeconds));
+                log.info("[TASK:INDEXING:END] Metrics: {}", 
MetricsFormatter.createMetricsWithDurationOnly(indexingDurationSeconds));
+                MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_INDEXING_DURATION_SECONDS, indexingDurationSeconds);
+                indexingReporter.addTiming("Build Lucene Index", 
FormattingUtils.formatToSeconds(indexingDurationSeconds));
+            } catch (Throwable t) {
+                log.info("[TASK:INDEXING:FAIL] Metrics: {}, Error: {}",
+                        
MetricsFormatter.createMetricsWithDurationOnly(indexerWatch), t.toString());
+                throw t;
+            }
 
-        if (flatFileStores.size() > 1) {
-            indexParallel(flatFileStores, indexer, progressReporter);
-        } else if (flatFileStores.size() == 1) {
-            FlatFileStore flatFileStore = flatFileStores.get(0);
-            for (NodeStateEntry entry : flatFileStore) {
-                reportDocumentRead(entry.getPath(), progressReporter);
-                indexer.index(entry);
+            log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node 
store");
+            Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
+            try {
+                copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, 
CommitInfo.EMPTY);
+                long mergeNodeStoreDurationSeconds = 
mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS);
+                log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", 
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreDurationSeconds));
+                MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds);
+                indexingReporter.addTiming("Merge node store", 
FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds));
+            } catch (Throwable t) {
+                log.info("[TASK:MERGE_NODE_STORE:FAIL] Metrics: {}, Error: {}",
+                        
MetricsFormatter.createMetricsWithDurationOnly(mergeNodeStoreWatch), 
t.toString());
+                throw t;
             }
-        }
 
-        progressReporter.reindexingTraversalEnd();
-        progressReporter.logReport();
-        long indexingDurationSeconds = indexerWatch.elapsed(TimeUnit.SECONDS);
-        log.info("Completed the indexing in {}", 
FormattingUtils.formatToSeconds(indexingDurationSeconds));
-        log.info("[TASK:INDEXING:END] Metrics: {}", 
MetricsFormatter.newBuilder()
-                .add("duration", 
FormattingUtils.formatToSeconds(indexingDurationSeconds))
-                .add("durationSeconds", indexingDurationSeconds)
-                .build());
-        MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_INDEXING_DURATION_SECONDS, indexingDurationSeconds);
-        indexingReporter.addTiming("Build Lucene Index", 
FormattingUtils.formatToSeconds(indexingDurationSeconds));
-
-        log.info("[TASK:MERGE_NODE_STORE:START] Starting merge node store");
-        Stopwatch mergeNodeStoreWatch = Stopwatch.createStarted();
-        copyOnWriteStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-        long mergeNodeStoreDurationSeconds = 
mergeNodeStoreWatch.elapsed(TimeUnit.SECONDS);
-        log.info("[TASK:MERGE_NODE_STORE:END] Metrics: {}", 
MetricsFormatter.newBuilder()
-                .add("duration", 
FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds))
-                .add("durationSeconds", mergeNodeStoreDurationSeconds)
-                .build());
-        MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_MERGE_NODE_STORE_DURATION_SECONDS, mergeNodeStoreDurationSeconds);
-        indexingReporter.addTiming("Merge node store", 
FormattingUtils.formatToSeconds(mergeNodeStoreDurationSeconds));
-
-        indexerSupport.postIndexWork(copyOnWriteStore);
-
-        long fullIndexCreationDurationSeconds = 
indexJobWatch.elapsed(TimeUnit.SECONDS);
-        log.info("[TASK:FULL_INDEX_CREATION:END] Metrics {}", 
MetricsFormatter.newBuilder()
-                .add("duration", 
FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds))
-                .add("durationSeconds", fullIndexCreationDurationSeconds)
-                .build());
-
-        MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_FULL_INDEX_CREATION_DURATION_SECONDS, fullIndexCreationDurationSeconds);
-        indexingReporter.addTiming("Total time", 
FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds));
+            indexerSupport.postIndexWork(copyOnWriteStore);
+
+            long fullIndexCreationDurationSeconds = 
indexJobWatch.elapsed(TimeUnit.SECONDS);
+            log.info("[TASK:FULL_INDEX_CREATION:END] Metrics {}", 
MetricsFormatter.createMetricsWithDurationOnly(fullIndexCreationDurationSeconds));
+            MetricsUtils.addMetric(statisticsProvider, indexingReporter, 
METRIC_FULL_INDEX_CREATION_DURATION_SECONDS, fullIndexCreationDurationSeconds);
+            indexingReporter.addTiming("Total time", 
FormattingUtils.formatToSeconds(fullIndexCreationDurationSeconds));
+        } catch (Throwable t) {
+            log.info("[TASK:FULL_INDEX_CREATION:FAIL] Metrics: {}, Error: {}",
+                    
MetricsFormatter.createMetricsWithDurationOnly(indexJobWatch), t.toString());
+            throw t;
+        }
     }
 
     private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer 
indexer, IndexingProgressReporter progressReporter)
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
index 6681399f63..bc3b352e27 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMergeSortTask.java
@@ -210,23 +210,24 @@ public class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.R
         String originalName = Thread.currentThread().getName();
         Thread.currentThread().setName(THREAD_NAME);
         int intermediateFilesCount = 0;
+        LOG.info("[TASK:{}:START] Starting merge sort task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
+        Stopwatch finalMergeWatch = Stopwatch.createUnstarted();
         try {
-            LOG.info("[TASK:{}:START] Starting merge sort task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
             while (true) {
                 LOG.debug("Waiting for next intermediate sorted file");
                 Path sortedIntermediateFile = sortedFilesQueue.take();
                 if (sortedIntermediateFile == SENTINEL_SORTED_FILES_QUEUE) {
                     long sortedFilesSizeBytes = sizeOf(sortedFiles);
                     LOG.info("Going to sort {} files, total size {}", 
sortedFiles.size(), IOUtils.humanReadableByteCountBin(sortedFilesSizeBytes));
-                    Stopwatch w = Stopwatch.createStarted();
+                    finalMergeWatch.start();
                     List<Path> simpleFileList = sortedFiles.stream().map(f -> 
f.file).collect(Collectors.toList());
                     Path flatFileStore = sortStoreFile(simpleFileList);
 
-                    LOG.info("Final merge completed in {}. Created file: {}", 
FormattingUtils.formatToSeconds(w), flatFileStore.toAbsolutePath());
+                    LOG.info("Final merge completed in {}. Created file: {}", 
FormattingUtils.formatToSeconds(finalMergeWatch), 
flatFileStore.toAbsolutePath());
                     long ffsSizeBytes = Files.size(flatFileStore);
-                    long durationSeconds = w.elapsed(TimeUnit.SECONDS);
+                    long durationSeconds = 
finalMergeWatch.elapsed(TimeUnit.SECONDS);
                     String metrics = MetricsFormatter.newBuilder()
-                            .add("duration", 
FormattingUtils.formatToSeconds(w))
+                            .add("duration", 
FormattingUtils.formatToSeconds(finalMergeWatch))
                             .add("durationSeconds", durationSeconds)
                             .add("intermediateFilesCount", 
intermediateFilesCount)
                             .add("eagerMergesRuns", eagerMergeRuns)
@@ -236,7 +237,7 @@ public class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.R
                             .build();
 
                     LOG.info("[TASK:{}:END] Metrics: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT), metrics);
-                    reporter.addTiming("Merge sort", 
FormattingUtils.formatToSeconds(w));
+                    reporter.addTiming("Merge sort", 
FormattingUtils.formatToSeconds(finalMergeWatch));
                     MetricsUtils.addMetric(statisticsProvider, reporter, 
PipelinedMetrics.OAK_INDEXER_PIPELINED_MERGE_SORT_FINAL_MERGE_DURATION_SECONDS, 
durationSeconds);
                     MetricsUtils.addMetric(statisticsProvider, reporter, 
PipelinedMetrics.OAK_INDEXER_PIPELINED_MERGE_SORT_INTERMEDIATE_FILES_TOTAL, 
intermediateFilesCount);
                     MetricsUtils.addMetric(statisticsProvider, reporter, 
PipelinedMetrics.OAK_INDEXER_PIPELINED_MERGE_SORT_EAGER_MERGES_RUNS_TOTAL, 
eagerMergeRuns);
@@ -259,10 +260,10 @@ public class PipelinedMergeSortTask implements 
Callable<PipelinedMergeSortTask.R
                     }
                 }
             }
-        } catch (InterruptedException t) {
-            LOG.warn("Thread interrupted", t);
-            throw t;
         } catch (Throwable t) {
+            LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}", 
THREAD_NAME.toUpperCase(Locale.ROOT),
+                    
MetricsFormatter.createMetricsWithDurationOnly(finalMergeWatch),
+                    t.toString());
             LOG.warn("Thread terminating with exception", t);
             throw t;
         } finally {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
index 5b8568fd6c..73638210cf 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java
@@ -46,6 +46,7 @@ import 
org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import 
org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreHelper;
 import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
+import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.apache.jackrabbit.oak.spi.filter.PathFilter;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.bson.BsonDocument;
@@ -325,19 +326,27 @@ public class PipelinedMongoDownloadTask implements 
Callable<PipelinedMongoDownlo
                         .getCollection(Collection.NODES.toString(), 
NodeDocument.class);
 
                 LOG.info("[TASK:{}:START] Starting to download from MongoDB", 
Thread.currentThread().getName().toUpperCase(Locale.ROOT));
-                downloadStartWatch.start();
-                if (retryOnConnectionErrors) {
-                    downloadWithRetryOnConnectionErrors();
-                } else {
-                    downloadWithNaturalOrdering();
+                try {
+                    downloadStartWatch.start();
+                    if (retryOnConnectionErrors) {
+                        downloadWithRetryOnConnectionErrors();
+                    } else {
+                        downloadWithNaturalOrdering();
+                    }
+                    downloadStartWatch.stop();
+                    long durationMillis = 
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
+                    
downloadStageStatistics.publishStatistics(statisticsProvider, reporter, 
durationMillis);
+                    String metrics = 
downloadStageStatistics.formatStats(durationMillis);
+                    LOG.info("[TASK:{}:END] Metrics: {}", 
Thread.currentThread().getName().toUpperCase(Locale.ROOT), metrics);
+                    reporter.addTiming("Mongo dump", 
FormattingUtils.formatToSeconds(downloadStartWatch));
+                    return new 
PipelinedMongoDownloadTask.Result(downloadStageStatistics.getDocumentsDownloadedTotal());
+                } catch (Throwable t) {
+                    LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}",
+                            
Thread.currentThread().getName().toUpperCase(Locale.ROOT),
+                            
MetricsFormatter.createMetricsWithDurationOnly(downloadStartWatch),
+                            t.toString());
+                    throw t;
                 }
-                downloadStartWatch.stop();
-                long durationMillis = 
downloadStartWatch.elapsed(TimeUnit.MILLISECONDS);
-                downloadStageStatistics.publishStatistics(statisticsProvider, 
reporter, durationMillis);
-                String metrics = 
downloadStageStatistics.formatStats(durationMillis);
-                LOG.info("[TASK:{}:END] Metrics: {}", 
Thread.currentThread().getName().toUpperCase(Locale.ROOT), metrics);
-                reporter.addTiming("Mongo dump", 
FormattingUtils.formatToSeconds(downloadStartWatch));
-                return new 
PipelinedMongoDownloadTask.Result(downloadStageStatistics.getDocumentsDownloadedTotal());
             }
         } finally {
             Thread.currentThread().setName(originalName);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
index 3994b3285b..59a1346a56 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTask.java
@@ -103,8 +103,8 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
         Stopwatch taskStartTime = Stopwatch.createStarted();
         String originalName = Thread.currentThread().getName();
         Thread.currentThread().setName(THREAD_NAME);
+        LOG.info("[TASK:{}:START] Starting sort-and-save task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
         try {
-            LOG.info("[TASK:{}:START] Starting sort-and-save task", 
THREAD_NAME.toUpperCase(Locale.ROOT));
             while (true) {
                 LOG.info("Waiting for next batch");
                 NodeStateEntryBatch nseBuffer = nonEmptyBuffersQueue.take();
@@ -142,10 +142,11 @@ class PipelinedSortBatchTask implements 
Callable<PipelinedSortBatchTask.Result>
                 nseBuffer.reset();
                 emptyBuffersQueue.put(nseBuffer);
             }
-        } catch (InterruptedException t) {
-            LOG.warn("Thread interrupted", t);
-            throw t;
         } catch (Throwable t) {
+            LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}",
+                    THREAD_NAME.toUpperCase(Locale.ROOT),
+                    
MetricsFormatter.createMetricsWithDurationOnly(taskStartTime),
+                    t.toString());
             LOG.warn("Thread terminating with exception", t);
             throw t;
         } finally {
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 195922f12b..e54917fd52 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -516,6 +516,9 @@ public class PipelinedStrategy extends 
IndexStoreSortStrategyBase {
 
                 LOG.info("[INDEXING_REPORT:BUILD_FFS]\n{}", 
indexingReporter.generateReport());
             } catch (Throwable e) {
+                LOG.info("[TASK:PIPELINED-DUMP:FAIL] Metrics: {}, Error: {}",
+                        MetricsFormatter.createMetricsWithDurationOnly(start), 
e.toString()
+                );
                 LOG.warn("Error dumping from MongoDB. Cancelling all tasks. 
Error: {}", e.toString());
                 // Cancel in order
                 cancelFuture(downloadFuture);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
index 62fea7ac46..2672c8dc2d 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTransformTask.java
@@ -115,10 +115,10 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
         String originalName = Thread.currentThread().getName();
         String threadName = THREAD_NAME_PREFIX + threadId;
         Thread.currentThread().setName(threadName);
+        Stopwatch taskStartWatch = Stopwatch.createStarted();
         try {
             LOG.info("[TASK:{}:START] Starting transform task", 
threadName.toUpperCase(Locale.ROOT));
             NodeDocumentCache nodeCache = 
MongoDocumentStoreHelper.getNodeDocumentCache(mongoStore);
-            Stopwatch taskStartWatch = Stopwatch.createStarted();
             long totalDocumentQueueWaitTimeMillis = 0;
             long totalEntryCount = 0;
             long mongoObjectsProcessed = 0;
@@ -215,10 +215,9 @@ class PipelinedTransformTask implements 
Callable<PipelinedTransformTask.Result>
                     }
                 }
             }
-        } catch (InterruptedException t) {
-            LOG.warn("Thread interrupted", t);
-            throw t;
         } catch (Throwable t) {
+            LOG.info("[TASK:{}:FAIL] Metrics: {}, Error: {}",
+                    threadName.toUpperCase(Locale.ROOT), 
MetricsFormatter.createMetricsWithDurationOnly(taskStartWatch), t.toString());
             LOG.warn("Thread terminating with exception", t);
             throw t;
         } finally {

Reply via email to