This is an automated email from the ASF dual-hosted git repository. thomasm pushed a commit to branch OAK-11365 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 14a84c4eccf2b6e468c640b64e348ce89a5953f1 Author: Thomas Mueller <[email protected]> AuthorDate: Wed Jan 8 18:24:58 2025 +0100 OAK-11365 Incremental index store: ability to set a timeout --- .../indexer/document/DocumentStoreIndexerBase.java | 5 +++++ .../IncrementalFlatFileStoreEditor.java | 23 +++++++++++++++++++++- .../IncrementalFlatFileStoreStrategy.java | 6 ++++-- .../incrementalstore/IncrementalStoreBuilder.java | 9 ++++++++- .../jackrabbit/oak/index/IncrementalStoreTest.java | 2 +- 5 files changed, 40 insertions(+), 5 deletions(-) diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java index 536da52c74..f53867a03a 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java @@ -266,6 +266,10 @@ public abstract class DocumentStoreIndexerBase implements Closeable { } public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint) throws IOException, CommitFailedException { + return buildStore(initialCheckpoint, finalCheckpoint, Long.MAX_VALUE); + } + + public IndexStore buildStore(String initialCheckpoint, String finalCheckpoint, long maxDurationSeconds) throws IOException, CommitFailedException { IncrementalStoreBuilder builder; IndexStore incrementalStore; Set<IndexDefinition> indexDefinitions = indexerSupport.getIndexDefinitions(); @@ -308,6 +312,7 @@ public abstract class DocumentStoreIndexerBase implements Closeable { try { builder = new IncrementalStoreBuilder(indexHelper.getWorkDir(), indexHelper, initialCheckpoint, finalCheckpoint) .withPreferredPathElements(preferredPathElements) + .withMaxDurationSeconds(maxDurationSeconds) .withPathPredicate(predicate) .withBlobStore(indexHelper.getGCBlobStore()); incrementalStore = builder.build(); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java index cb87699c7a..73625a2a2d 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreEditor.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; public class IncrementalFlatFileStoreEditor implements Editor { @@ -38,18 +39,29 @@ public class IncrementalFlatFileStoreEditor implements Editor { private final IncrementalFlatFileStoreNodeStateEntryWriter entryWriter; private final Predicate<String> predicate; private final IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy; + // if not 0, timeout if System.nanoTime() exceeds this value + private final long timeoutAtNanos; private static final int LINE_SEP_LENGTH = System.getProperty("line.separator").length(); public IncrementalFlatFileStoreEditor(BufferedWriter bufferedWriter, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, Predicate<String> predicate, - IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy) { + IncrementalFlatFileStoreStrategy incrementalFlatFileStoreStrategy, long maxDurationSeconds) { this.bufferedWriter = bufferedWriter; this.entryWriter = entryWriter; this.predicate = predicate; this.incrementalFlatFileStoreStrategy = incrementalFlatFileStoreStrategy; + long timeout; + if (maxDurationSeconds == Long.MAX_VALUE) { + timeout = 0; + } else { + timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(maxDurationSeconds, TimeUnit.SECONDS); + log.info("Max duration: " + maxDurationSeconds + " timeout: " + timeout + " now: " + System.nanoTime()); + } + this.timeoutAtNanos = timeout; } @Override public void enter(NodeState before, NodeState after) { + checkTimeout(); } @Override @@ -112,4 +124,13 @@ public class IncrementalFlatFileStoreEditor implements Editor { throw new RuntimeException("Error while creating incremental store", ex); } } + + private void checkTimeout() { + if (timeoutAtNanos != 0) { + long now = System.nanoTime(); + if (now > timeoutAtNanos) { + throw new RuntimeException("Timeout"); + } + } + } } diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java index 1718e71f86..3b33840c1e 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalFlatFileStoreStrategy.java @@ -63,10 +63,11 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo private long textSize = 0; private long entryCount = 0; private final Set<String> preferredPathElements; + private final long maxDurationSeconds; public IncrementalFlatFileStoreStrategy(NodeStore nodeStore, @NotNull String beforeCheckpoint, @NotNull String afterCheckpoint, File storeDir, Set<String> preferredPathElements, @NotNull Compression algorithm, - Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter) { + Predicate<String> pathPredicate, IncrementalFlatFileStoreNodeStateEntryWriter entryWriter, long maxDurationSeconds) { this.nodeStore = nodeStore; this.beforeCheckpoint = beforeCheckpoint; this.afterCheckpoint = afterCheckpoint; @@ -76,6 +77,7 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo this.entryWriter = entryWriter; this.preferredPathElements = preferredPathElements; this.comparator = new PathElementComparator(preferredPathElements); + this.maxDurationSeconds = maxDurationSeconds; } @Override @@ -85,7 +87,7 @@ public class IncrementalFlatFileStoreStrategy implements IncrementalIndexStoreSo try (BufferedWriter w = FlatFileStoreUtils.createWriter(file, algorithm)) { NodeState before = Objects.requireNonNull(nodeStore.retrieve(beforeCheckpoint)); NodeState after = Objects.requireNonNull(nodeStore.retrieve(afterCheckpoint)); - Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this)), before, after); + Exception e = EditorDiff.process(VisibleEditor.wrap(new IncrementalFlatFileStoreEditor(w, entryWriter, pathPredicate, this, maxDurationSeconds)), before, after); if (e != null) { log.error("Exception while building incremental store for checkpoint before {}, after {}", beforeCheckpoint, afterCheckpoint, e); throw new RuntimeException(e); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java index 03565746e6..aa00438459 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreBuilder.java @@ -50,6 +50,7 @@ public class IncrementalStoreBuilder { private final IndexHelper indexHelper; private final String initialCheckpoint; private final String finalCheckpoint; + private long maxDurationSeconds = Long.MAX_VALUE; private Predicate<String> pathPredicate = path -> true; private Set<String> preferredPathElements = Collections.emptySet(); private BlobStore blobStore; @@ -107,6 +108,10 @@ public class IncrementalStoreBuilder { return this; } + public IncrementalStoreBuilder withMaxDurationSeconds(long maxDurationSeconds) { + this.maxDurationSeconds = maxDurationSeconds; + return this; + } public IndexStore build() throws IOException, CompositeException { logFlags(); @@ -115,11 +120,12 @@ public class IncrementalStoreBuilder { if (sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_FFS_STORE || sortStrategyType == IncrementalSortStrategyType.INCREMENTAL_TREE_STORE) { IncrementalFlatFileStoreNodeStateEntryWriter entryWriter = new IncrementalFlatFileStoreNodeStateEntryWriter(blobStore); + IncrementalIndexStoreSortStrategy strategy = new IncrementalFlatFileStoreStrategy( indexHelper.getNodeStore(), initialCheckpoint, finalCheckpoint, - dir, preferredPathElements, algorithm, pathPredicate, entryWriter); + dir, preferredPathElements, algorithm, pathPredicate, entryWriter, maxDurationSeconds); File metadataFile = strategy.createMetadataFile(); File incrementalStoreFile = strategy.createSortedStoreFile(); long entryCount = strategy.getEntryCount(); @@ -147,4 +153,5 @@ public class IncrementalStoreBuilder { log.info("Compression enabled while sorting : {} ({})", IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP); log.info("LZ4 enabled for compression algorithm : {} ({})", IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4); } + } diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java index a10e6ad33d..c1d3da92ea 100644 --- a/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java +++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/index/IncrementalStoreTest.java @@ -401,7 +401,7 @@ public class IncrementalStoreTest { readOnlyNodeStore.retrieve(finalCheckpoint); return new IncrementalFlatFileStoreStrategy( readOnlyNodeStore, initialCheckpoint, finalCheckpoint, sortFolder.getRoot(), preferredPathElements, - algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore)); + algorithm, pathPredicate, new IncrementalFlatFileStoreNodeStateEntryWriter(fileBlobStore), Long.MAX_VALUE); } private void createBaseContent(NodeStore rwNodeStore) throws CommitFailedException {
