This is an automated email from the ASF dual-hosted git repository.

thomasm pushed a commit to branch OAK-11365
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 14a84c4eccf2b6e468c640b64e348ce89a5953f1
Author: Thomas Mueller <[email protected]>
AuthorDate: Wed Jan 8 18:24:58 2025 +0100

    OAK-11365 Incremental index store: ability to set a timeout
---
 .../indexer/document/DocumentStoreIndexerBase.java |  5 +++++
 .../IncrementalFlatFileStoreEditor.java            | 23 +++++++++++++++++++++-
 .../IncrementalFlatFileStoreStrategy.java          |  6 ++++--
 .../incrementalstore/IncrementalStoreBuilder.java  |  9 ++++++++-
 .../jackrabbit/oak/index/IncrementalStoreTest.java |  2 +-
 5 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 536da52c74..f53867a03a 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -266,6 +266,10 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
     }
 
     public IndexStore buildStore(String initialCheckpoint, String 
finalCheckpoint) throws IOException, CommitFailedException {
+        return buildStore(initialCheckpoint, finalCheckpoint, Long.MAX_VALUE);
+    }
+
+    public IndexStore buildStore(String initialCheckpoint, String 
finalCheckpoint, long maxDurationSeconds) throws IOException, 
CommitFailedException {
         IncrementalStoreBuilder builder;
         IndexStore incrementalStore;
         Set<IndexDefinition> indexDefinitions = 
indexerSupport.getIndexDefinitions();
@@ -308,6 +312,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         try {
             builder = new IncrementalStoreBuilder(indexHelper.getWorkDir(), 
indexHelper, initialCheckpoint, finalCheckpoint)
                     .withPreferredPathElements(preferredPathElements)
+                    .withMaxDurationSeconds(maxDurationSeconds)
                     .withPathPredicate(predicate)
                     .withBlobStore(indexHelper.getGCBlobStore());
             incrementalStore = builder.build();
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java
index cb87699c7a..73625a2a2d 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
 public class IncrementalFlatFileStoreEditor implements Editor {
@@ -38,18 +39,29 @@ public class IncrementalFlatFileStoreEditor implements 
Editor {
     private final IncrementalFlatFileStoreNodeStateEntryWriter entryWriter;
     private final Predicate<String> predicate;
     private final IncrementalFlatFileStoreStrategy 
incrementalFlatFileStoreStrategy;
+    // if not 0, timeout if System.nanoTime() exceeds this value
+    private final long timeoutAtNanos;
     private static final int LINE_SEP_LENGTH = 
System.getProperty("line.separator").length();
 
     public IncrementalFlatFileStoreEditor(BufferedWriter bufferedWriter, 
IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, Predicate<String> 
predicate,
-                                          IncrementalFlatFileStoreStrategy 
incrementalFlatFileStoreStrategy) {
+                                          IncrementalFlatFileStoreStrategy 
incrementalFlatFileStoreStrategy, long maxDurationSeconds) {
         this.bufferedWriter = bufferedWriter;
         this.entryWriter = entryWriter;
         this.predicate = predicate;
         this.incrementalFlatFileStoreStrategy = 
incrementalFlatFileStoreStrategy;
+        long timeout;
+        if (maxDurationSeconds == Long.MAX_VALUE) {
+            timeout = 0;
+        } else {
+            timeout = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(maxDurationSeconds, TimeUnit.SECONDS);
+            log.info("Max duration: " + maxDurationSeconds + " timeout: " + 
timeout + " now: " + System.nanoTime());
+        }
+        this.timeoutAtNanos = timeout;
     }
 
     @Override
     public void enter(NodeState before, NodeState after) {
+        checkTimeout();
     }
 
     @Override
@@ -112,4 +124,13 @@ public class IncrementalFlatFileStoreEditor implements 
Editor {
             throw new RuntimeException("Error while creating incremental 
store", ex);
         }
     }
+
+    private void checkTimeout() {
+        if (timeoutAtNanos != 0) {
+            long now = System.nanoTime();
+            if (now > timeoutAtNanos) {
+                throw new RuntimeException("Timeout");
+            }
+        }
+    }
 }
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java
index 1718e71f86..3b33840c1e 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java
@@ -63,10 +63,11 @@ public class IncrementalFlatFileStoreStrategy implements 
IncrementalIndexStoreSo
     private long textSize = 0;
     private long entryCount = 0;
     private final Set<String> preferredPathElements;
+    private final long maxDurationSeconds;
 
     public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull 
String beforeCheckpoint, @NotNull String afterCheckpoint, File storeDir,
                                             Set<String> preferredPathElements, 
@NotNull Compression algorithm,
-                                            Predicate<String> pathPredicate, 
IncrementalFlatFileStoreNodeStateEntryWriter entryWriter) {
+                                            Predicate<String> pathPredicate, 
IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, long 
maxDurationSeconds) {
         this.nodeStore = nodeStore;
         this.beforeCheckpoint = beforeCheckpoint;
         this.afterCheckpoint = afterCheckpoint;
@@ -76,6 +77,7 @@ public class IncrementalFlatFileStoreStrategy implements 
IncrementalIndexStoreSo
         this.entryWriter = entryWriter;
         this.preferredPathElements = preferredPathElements;
         this.comparator = new PathElementComparator(preferredPathElements);
+        this.maxDurationSeconds = maxDurationSeconds;
     }
 
     @Override
@@ -85,7 +87,7 @@ public class IncrementalFlatFileStoreStrategy implements 
IncrementalIndexStoreSo
         try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, 
algorithm)) {
             NodeState before = 
Objects.requireNonNull(nodeStore.retrieve(beforeCheckpoint));
             NodeState after = 
Objects.requireNonNull(nodeStore.retrieve(afterCheckpoint));
-            Exception e = EditorDiff.process(VisibleEditor.wrap(new 
IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this)), before, 
after);
+            Exception e = EditorDiff.process(VisibleEditor.wrap(new 
IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this, 
maxDurationSeconds)), before, after);
             if (e != null) {
                 log.error("Exception while building incremental store for 
checkpoint before {}, after {}", beforeCheckpoint, afterCheckpoint, e);
                 throw new RuntimeException(e);
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java
index 03565746e6..aa00438459 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java
@@ -50,6 +50,7 @@ public class IncrementalStoreBuilder {
     private final IndexHelper indexHelper;
     private final String initialCheckpoint;
     private final String finalCheckpoint;
+    private long maxDurationSeconds = Long.MAX_VALUE;
     private Predicate<String> pathPredicate = path -> true;
     private Set<String> preferredPathElements = Collections.emptySet();
     private BlobStore blobStore;
@@ -107,6 +108,10 @@ public class IncrementalStoreBuilder {
         return this;
     }
 
+    public IncrementalStoreBuilder withMaxDurationSeconds(long 
maxDurationSeconds) {
+        this.maxDurationSeconds = maxDurationSeconds;
+        return this;
+    }
 
     public IndexStore build() throws IOException, CompositeException {
         logFlags();
@@ -115,11 +120,12 @@ public class IncrementalStoreBuilder {
         if (sortStrategyType == 
IncrementalSortStrategyType.INCREMENTAL_FFS_STORE ||
                 sortStrategyType == 
IncrementalSortStrategyType.INCREMENTAL_TREE_STORE) {
             IncrementalFlatFileStoreNodeStateEntryWriter entryWriter = new 
IncrementalFlatFileStoreNodeStateEntryWriter(blobStore);
+
             IncrementalIndexStoreSortStrategy strategy = new 
IncrementalFlatFileStoreStrategy(
                     indexHelper.getNodeStore(),
                     initialCheckpoint,
                     finalCheckpoint,
-                    dir, preferredPathElements, algorithm, pathPredicate, 
entryWriter);
+                    dir, preferredPathElements, algorithm, pathPredicate, 
entryWriter, maxDurationSeconds);
             File metadataFile = strategy.createMetadataFile();
             File incrementalStoreFile = strategy.createSortedStoreFile();
             long entryCount = strategy.getEntryCount();
@@ -147,4 +153,5 @@ public class IncrementalStoreBuilder {
         log.info("Compression enabled while sorting : {} ({})", 
IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP);
         log.info("LZ4 enabled for compression algorithm : {} ({})", 
IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4);
     }
+
 }
diff --git 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java
 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java
index a10e6ad33d..c1d3da92ea 100644
--- 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java
+++ 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java
@@ -401,7 +401,7 @@ public class IncrementalStoreTest {
         readOnlyNodeStore.retrieve(finalCheckpoint);
         return new IncrementalFlatFileStoreStrategy(
                 readOnlyNodeStore, initialCheckpoint, finalCheckpoint, 
sortFolder.getRoot(), preferredPathElements,
-                algorithm, pathPredicate, new 
IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore));
+                algorithm, pathPredicate, new 
IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore), Long.MAX_VALUE);
     }
 
     private void createBaseContent(NodeStore rwNodeStore) throws 
CommitFailedException {

Reply via email to