This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8fc92a1ca7 OAK-11061 - Indexing job: during indexing phase, download
blobs ahead of time in separate thread pool (#1669)
8fc92a1ca7 is described below
commit 8fc92a1ca712bc125f3b11b3a6e9961740cbd9ac
Author: Nuno Santos <[email protected]>
AuthorDate: Mon Sep 2 16:54:21 2024 +0200
OAK-11061 - Indexing job: during indexing phase, download blobs ahead of
time in separate thread pool (#1669)
---
.../indexer/document/DocumentStoreIndexerBase.java | 38 +-
.../flatfile/AheadOfTimeBlobDownloader.java | 42 +++
.../AheadOfTimeBlobDownloaderThrottler.java | 215 ++++++++++++
.../AheadOfTimeBlobDownloadingFlatFileStore.java | 169 +++++++++
.../flatfile/DefaultAheadOfTimeBlobDownloader.java | 390 +++++++++++++++++++++
.../flatfile/FlatFileNodeStoreBuilder.java | 35 +-
.../indexer/document/flatfile/FlatFileStore.java | 8 +
.../document/flatfile/NodeStateEntryReader.java | 2 +-
.../AheadOfTimeBlobDownloaderThrottlerTest.java | 185 ++++++++++
...headOfTimeBlobDownloadingFlatFileStoreTest.java | 68 ++++
.../flatfile/FlatFileNodeStoreBuilderTest.java | 11 +-
.../document/flatfile/FlatFileStoreTest.java | 3 +-
.../apache/jackrabbit/oak/index/IndexCommand.java | 28 +-
13 files changed, 1153 insertions(+), 41 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 1d76d920fa..95c87c5699 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -30,7 +30,6 @@ import
org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.index.IndexerSupport;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder;
-import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStore;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
import
org.apache.jackrabbit.oak.index.indexer.document.incrementalstore.IncrementalStoreBuilder;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
@@ -165,14 +164,14 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
}
}
- private List<FlatFileStore> buildFlatFileStoreList(NodeState
checkpointedState,
+ private List<IndexStore> buildFlatFileStoreList(NodeState
checkpointedState,
CompositeIndexer
indexer,
Predicate<String>
pathPredicate,
Set<String>
preferredPathElements,
boolean splitFlatFile,
Set<IndexDefinition>
indexDefinitions,
IndexingReporter
reporter) throws IOException {
- List<FlatFileStore> storeList = new ArrayList<>();
+ List<IndexStore> storeList = new ArrayList<>();
Stopwatch flatFileStoreWatch = Stopwatch.createStarted();
int executionCount = 1;
@@ -201,7 +200,8 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
nodeStore, getMongoDocumentStore(),
traversalLog))
.withCheckpoint(indexerSupport.getCheckpoint())
.withStatisticsProvider(indexHelper.getStatisticsProvider())
- .withIndexingReporter(reporter);
+ .withIndexingReporter(reporter)
+ .withAheadOfTimeBlobDownloader(true);
for (File dir : previousDownloadDirs) {
builder.addExistingDataDumpDir(dir);
@@ -209,9 +209,9 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
if (splitFlatFile) {
storeList = builder.buildList(indexHelper, indexerSupport,
indexDefinitions);
} else {
- storeList.add(builder.build());
+ storeList.add(builder.build(indexHelper, indexer));
}
- for (FlatFileStore item : storeList) {
+ for (IndexStore item : storeList) {
closer.register(item);
}
} catch (CompositeException e) {
@@ -304,16 +304,16 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
* @deprecated replaced by {@link #buildStore()}
*/
@Deprecated
- public FlatFileStore buildFlatFileStore() throws IOException,
CommitFailedException {
+ public IndexStore buildFlatFileStore() throws IOException,
CommitFailedException {
NodeState checkpointedState =
indexerSupport.retrieveNodeStateForCheckpoint();
Set<IndexDefinition> indexDefinitions =
indexerSupport.getIndexDefinitions();
Set<String> preferredPathElements =
indexerSupport.getPreferredPathElements(indexDefinitions);
Predicate<String> predicate =
indexerSupport.getFilterPredicate(indexDefinitions, Function.identity());
- FlatFileStore flatFileStore =
buildFlatFileStoreList(checkpointedState, null, predicate,
+ IndexStore indexStore = buildFlatFileStoreList(checkpointedState,
null, predicate,
preferredPathElements,
IndexerConfiguration.parallelIndexEnabled(), indexDefinitions,
indexingReporter).get(0);
- log.info("FlatFileStore built at {}. To use this flatFileStore in a
reindex step, set System Property-{} with value {}",
- flatFileStore.getStorePath(), OAK_INDEXER_SORTED_FILE_PATH,
flatFileStore.getStorePath());
- return flatFileStore;
+ log.info("Store built at {}. To use this store in a reindex step, set
the system property {} to {}",
+ indexStore.getStorePath(), OAK_INDEXER_SORTED_FILE_PATH,
indexStore.getStorePath());
+ return indexStore;
}
public void reindex() throws CommitFailedException, IOException {
@@ -335,7 +335,7 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
closer.register(indexer);
- List<FlatFileStore> flatFileStores = buildFlatFileStoreList(
+ List<IndexStore> indexStores = buildFlatFileStoreList(
checkpointedState,
indexer,
indexer::shouldInclude,
@@ -353,14 +353,14 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
INDEXING_PHASE_LOGGER.info("[TASK:INDEXING:START] Starting
indexing");
Stopwatch indexerWatch = Stopwatch.createStarted();
try {
- if (flatFileStores.size() > 1) {
- indexParallel(flatFileStores, indexer, progressReporter);
- } else if (flatFileStores.size() == 1) {
- FlatFileStore flatFileStore = flatFileStores.get(0);
+ if (indexStores.size() > 1) {
+ indexParallel(indexStores, indexer, progressReporter);
+ } else if (indexStores.size() == 1) {
+ IndexStore indexStore = indexStores.get(0);
TopKSlowestPaths slowestTopKElements = new
TopKSlowestPaths(TOP_SLOWEST_PATHS_TO_LOG);
indexer.onIndexingStarting();
long entryStart = System.nanoTime();
- for (NodeStateEntry entry : flatFileStore) {
+ for (NodeStateEntry entry : indexStore) {
reportDocumentRead(entry.getPath(), progressReporter);
indexer.index(entry);
// Avoid calling System.nanoTime() twice per each
entry, by reusing the timestamp taken at the end
@@ -419,12 +419,12 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
}
}
- private void indexParallel(List<FlatFileStore> storeList, CompositeIndexer
indexer, IndexingProgressReporter progressReporter)
+ private void indexParallel(List<IndexStore> storeList, CompositeIndexer
indexer, IndexingProgressReporter progressReporter)
throws IOException {
ExecutorService service =
Executors.newFixedThreadPool(IndexerConfiguration.indexThreadPoolSize());
List<Future> futureList = new ArrayList<>();
- for (FlatFileStore item : storeList) {
+ for (IndexStore item : storeList) {
Future future = service.submit(() -> {
for (NodeStateEntry entry : item) {
reportDocumentRead(entry.getPath(), progressReporter);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloader.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloader.java
new file mode 100644
index 0000000000..d199506b7d
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.Closeable;
+
+public interface AheadOfTimeBlobDownloader extends Closeable {
+
+ void start();
+
+ void updateIndexed(long lastEntryIndexed);
+
+ AheadOfTimeBlobDownloader NOOP = new AheadOfTimeBlobDownloader() {
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void updateIndexed(long lastEntryIndexed) {
+ }
+
+ @Override
+ public void close() {
+ }
+ };
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloaderThrottler.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloaderThrottler.java
new file mode 100644
index 0000000000..55d6e48bad
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloaderThrottler.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Tracks a prefetch window for the AOT downloader. This class keeps a window
from {startPosition, endPosition} together
+ * with an estimation of the data that was downloaded inside this window. It
exposes two operations:
+ *
+ * <ul>
+ * <li> {@link #reserveSpaceForBlob(long, long)} - Reserves space to download
blob at the given position and size,
+ * blocking until enough space is available.
+ * <li> {@link #advanceIndexer(long)} - Advances the window until the given
position, removing from the window any
+ * positions lower or equal than the new index.
+ * </ul>
+ * <p>
+ */
+public class AheadOfTimeBlobDownloaderThrottler {
+ private final static Logger LOG =
LoggerFactory.getLogger(AheadOfTimeBlobDownloaderThrottler.class);
+
+ private static class DownloadedBlob {
+ // Position (line number) in the FFS
+ final long position;
+ final long size;
+
+ public DownloadedBlob(long position, long size) {
+ this.position = position;
+ this.size = size;
+ }
+
+ @Override
+ public String toString() {
+ return "DownloadedBlob{" +
+ "position=" + position +
+ ", size=" + size +
+ '}';
+ }
+ }
+
+ // Maximum number of blobs that can be downloaded ahead of time
+ private final int maxWindowSizeNumberOfBlobs;
+ // Maximum number of bytes that can be downloaded ahead of the indexer
+ private final long maxWindowSizeBytes;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ // List of blobs downloaded ahead of the indexer
+ private final ArrayDeque<DownloadedBlob> aotDownloadedBlobs;
+ // How much data is currently downloaded ahead of time. This is just an
optimization to avoid traversing the list of
+ // downloaded blobs every time we need the total window size.
+ private long currentWindowSizeBytes = 0;
+ // Top of the prefetch window
+ private long windowLastPosition = -1;
+ // Position of the indexer
+ private long indexerPosition = -1;
+
+ // Statistics, the maximum size ever reached by the window
+ private int highestWindowSizeNumberOfBlobs = 0;
+ private long highestWindowSizeBytes = 0;
+
+ /**
+ * @param maxWindowSizeBytes How many bytes can be downloaded ahead of the
indexer.
+ * @param maxWindowSizeNumberOfBlobs How many blobs can be downloaded
ahead of the indexer.
+ */
+ public AheadOfTimeBlobDownloaderThrottler(int maxWindowSizeNumberOfBlobs,
long maxWindowSizeBytes) {
+ if (maxWindowSizeNumberOfBlobs <= 0) {
+ throw new IllegalArgumentException("windowSizeNumberOfBlobs must
be positive");
+ }
+ if (maxWindowSizeBytes <= 0) {
+ throw new IllegalArgumentException("maximumSizeBytes must be
positive");
+ }
+ this.maxWindowSizeNumberOfBlobs = maxWindowSizeNumberOfBlobs;
+ this.maxWindowSizeBytes = maxWindowSizeBytes;
+ this.aotDownloadedBlobs = new ArrayDeque<>(maxWindowSizeNumberOfBlobs);
+ }
+
+ /**
+ * Reserves space for a blob to be downloaded. This method blocks until
there is enough space in the prefetch window.
+ * If the position of the reservation is lower or equal to the indexer
position, the reservation is ignored.
+ *
+ * @param position The position of the blob to be downloaded.
+ * @param length The length of the blob to be downloaded.
+ * @return true if space was reserved, false if the indexer is already
ahead of the position of the blob.
+ */
+ public boolean reserveSpaceForBlob(long position, long length) throws
InterruptedException {
+ if (length > maxWindowSizeBytes) {
+ LOG.warn("Blob length {} is higher than the maximum size of the
window {}. Proceeding with a reservation for the maximumSize of the
throttler.", length, maxWindowSizeBytes);
+ length = maxWindowSizeBytes;
+ }
+ lock.lock();
+ try {
+ if (position <= windowLastPosition) {
+ throw new IllegalArgumentException("blobPosition " + position
+ " is not higher than the current last position of the window " +
windowLastPosition);
+ }
+ if (position <= indexerPosition) {
+ LOG.warn("Blob position {} is lower than the indexer position
{}. Ignoring space reservation request", position, indexerPosition);
+ // Do not reserve space for this blob, it is already indexed
+ return false;
+ }
+
+ while (currentWindowSizeBytes + length > maxWindowSizeBytes ||
aotDownloadedBlobs.size() >= maxWindowSizeNumberOfBlobs) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting until indexer catches up. Downloader
position: {}, AOT data downloader: {}, number of aot downloaded blobs: {}",
+ firstPosition(),
IOUtils.humanReadableByteCount(currentWindowSizeBytes),
aotDownloadedBlobs.size()
+ );
+ }
+ condition.await();
+ }
+ windowLastPosition = position;
+ aotDownloadedBlobs.addLast(new DownloadedBlob(position, length));
+ currentWindowSizeBytes += length;
+ // Update maximum values
+ if (aotDownloadedBlobs.size() > highestWindowSizeNumberOfBlobs) {
+ highestWindowSizeNumberOfBlobs = aotDownloadedBlobs.size();
+ }
+ if (currentWindowSizeBytes > highestWindowSizeBytes) {
+ highestWindowSizeBytes = currentWindowSizeBytes;
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public long getAvailableWindowSize() {
+ lock.lock();
+ try {
+ return maxWindowSizeNumberOfBlobs - aotDownloadedBlobs.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public long getAvailableWindowBytes() {
+ lock.lock();
+ try {
+ return maxWindowSizeBytes - currentWindowSizeBytes;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Advances the indexer to the given position.
+ *
+ * @param indexerPosition The new position of the indexer.
+ */
+ public void advanceIndexer(long indexerPosition) {
+ lock.lock();
+ try {
+ int oldWindowSize = aotDownloadedBlobs.size();
+ long oldWindowBytes = currentWindowSizeBytes;
+ while (true) {
+ DownloadedBlob head = aotDownloadedBlobs.peekFirst();
+ if (head != null && head.position <= indexerPosition) {
+ aotDownloadedBlobs.pollFirst();
+ currentWindowSizeBytes -= head.size;
+ } else {
+ break;
+ }
+ }
+ if (oldWindowSize != aotDownloadedBlobs.size()) {
+ LOG.debug("Window size reduced. Indexer position: {}.
windowSize: {} -> {}, windowSizeBytes: {} -> {}",
+ indexerPosition, oldWindowSize,
aotDownloadedBlobs.size(),
+ IOUtils.humanReadableByteCountBin(oldWindowBytes),
IOUtils.humanReadableByteCountBin(currentWindowSizeBytes));
+ if (currentWindowSizeBytes < 0) {
+ throw new IllegalStateException("AOT downloaded bytes is
negative. aotDownloaded: " + currentWindowSizeBytes);
+ }
+ condition.signalAll();
+ }
+ this.indexerPosition = indexerPosition;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private long firstPosition() {
+ return aotDownloadedBlobs.isEmpty() ? -1 :
aotDownloadedBlobs.getFirst().position;
+ }
+
+ public String formatStats() {
+ lock.lock();
+ try {
+ return String.format("AOT Downloader throttler:
{aotDownloadedBlobsSize: %s, aotDownloadedBlobsSizeBytes: %s,
maxWindowSizeNumberOfBlobs: %s, maxWindowSizeBytes: %s}",
+ aotDownloadedBlobs.size(),
IOUtils.humanReadableByteCount(currentWindowSizeBytes),
highestWindowSizeNumberOfBlobs,
IOUtils.humanReadableByteCount(highestWindowSizeBytes));
+ } finally {
+ lock.unlock();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStore.java
new file mode 100644
index 0000000000..767737dd89
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStore.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.ConfigHelper;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AheadOfTimeBlobDownloadingFlatFileStore implements IndexStore {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final FlatFileStore ffs;
+ private final CompositeIndexer indexer;
+ private final IndexHelper indexHelper;
+
+ public static final String BLOB_PREFETCH_ENABLE_FOR_INDEXES_PREFIXES =
"oak.indexer.blobPrefetch.enableForIndexesPrefixes";
+ public static final String BLOB_PREFETCH_BINARY_NODES_SUFFIX =
"oak.indexer.blobPrefetch.binaryNodesSuffix";
+ public static final String BLOB_PREFETCH_DOWNLOAD_THREADS =
"oak.indexer.blobPrefetch.downloadThreads";
+ public static final String BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_MB =
"oak.indexer.blobPrefetch.downloadAheadWindowMB";
+ public static final String BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_SIZE =
"oak.indexer.blobPrefetch.downloadAheadWindowSize";
+ private final String blobPrefetchEnableForIndexes =
ConfigHelper.getSystemPropertyAsString(BLOB_PREFETCH_ENABLE_FOR_INDEXES_PREFIXES,
"");
+ private final String blobPrefetchBinaryNodeSuffix =
ConfigHelper.getSystemPropertyAsString(BLOB_PREFETCH_BINARY_NODES_SUFFIX, "");
+ private final int nDownloadThreads =
ConfigHelper.getSystemPropertyAsInt(BLOB_PREFETCH_DOWNLOAD_THREADS, 4);
+ private final int maxPrefetchWindowMB =
ConfigHelper.getSystemPropertyAsInt(BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_MB, 32);
+ private final int maxPrefetchWindowSize =
ConfigHelper.getSystemPropertyAsInt(BLOB_PREFETCH_DOWNLOAD_AHEAD_WINDOW_SIZE,
4096);
+
+ public static AheadOfTimeBlobDownloadingFlatFileStore wrap(FlatFileStore
ffs, CompositeIndexer indexer, IndexHelper indexHelper) {
+ return new AheadOfTimeBlobDownloadingFlatFileStore(ffs, indexer,
indexHelper);
+ }
+
+ private AheadOfTimeBlobDownloadingFlatFileStore(FlatFileStore ffs,
CompositeIndexer indexer, IndexHelper indexHelper) {
+ this.ffs = ffs;
+ this.indexer = indexer;
+ this.indexHelper = indexHelper;
+ }
+
+ private @NotNull AheadOfTimeBlobDownloader
createAheadOfTimeBlobDownloader(CompositeIndexer indexer, IndexHelper
indexHelper) {
+ if (blobPrefetchBinaryNodeSuffix == null ||
blobPrefetchBinaryNodeSuffix.isEmpty()) {
+ log.info("Ahead of time blob downloader is disabled, no binary
node suffix provided");
+ return AheadOfTimeBlobDownloader.NOOP;
+ } else if (!isEnabledForIndexes(blobPrefetchEnableForIndexes,
indexHelper.getIndexPaths())) {
+ log.info("Ahead of time blob downloader is disabled, not enabled
for indexes: {}", indexHelper.getIndexPaths());
+ return AheadOfTimeBlobDownloader.NOOP;
+ } else {
+ return new DefaultAheadOfTimeBlobDownloader(
+ blobPrefetchBinaryNodeSuffix,
+ ffs.getStoreFile(),
+ ffs.getAlgorithm(),
+ indexHelper.getGCBlobStore(),
+ indexer,
+ nDownloadThreads,
+ maxPrefetchWindowSize,
+ maxPrefetchWindowMB);
+ }
+ }
+
+ static boolean isEnabledForIndexes(String indexesEnabledPrefix,
List<String> indexPaths) {
+ List<String> enableForIndexes = splitAndTrim(indexesEnabledPrefix);
+ for (String indexPath : indexPaths) {
+ if (enableForIndexes.stream().anyMatch(indexPath::startsWith)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static List<String> splitAndTrim(String str) {
+ if (str == null || str.isBlank()) {
+ return List.of();
+ } else {
+ return
Arrays.stream(str.split(",")).map(String::trim).collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public @NotNull Iterator<NodeStateEntry> iterator() {
+ final AheadOfTimeBlobDownloader aheadOfTimeBlobDownloader =
createAheadOfTimeBlobDownloader(indexer, indexHelper);
+ aheadOfTimeBlobDownloader.start();
+ return new Iterator<>() {
+
+ final Iterator<NodeStateEntry> it = ffs.iterator();
+ long entriesRead;
+
+ @Override
+ public boolean hasNext() {
+ boolean result = it.hasNext();
+ if (!result) {
+ aheadOfTimeBlobDownloader.updateIndexed(entriesRead);
+ try {
+ aheadOfTimeBlobDownloader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public NodeStateEntry next() {
+ entriesRead++;
+ // No need to update the progress reporter for each entry.
This should reduce a bit the
+ // overhead of updating the AOT downloader, which sets a
volatile field internally.
+ if (entriesRead % 128 == 0) {
+ aheadOfTimeBlobDownloader.updateIndexed(entriesRead);
+ }
+ return it.next();
+ }
+ };
+ }
+
+ @Override
+ public String getStorePath() {
+ return ffs.getStorePath();
+ }
+
+ @Override
+ public long getEntryCount() {
+ return ffs.getEntryCount();
+ }
+
+ @Override
+ public void setEntryCount(long entryCount) {
+ ffs.setEntryCount(entryCount);
+ }
+
+ @Override
+ public void close() throws IOException {
+ ffs.close();
+ }
+
+ @Override
+ public String getIndexStoreType() {
+ return ffs.getIndexStoreType();
+ }
+
+ @Override
+ public boolean isIncremental() {
+ return ffs.isIncremental();
+ }
+
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.java
new file mode 100644
index 0000000000..63eb55868c
--- /dev/null
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/DefaultAheadOfTimeBlobDownloader.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.commons.IOUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
+import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
+import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
+import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Scans a FlatFileStore for non-inlined blobs in nodes matching a given
pattern and downloads them from the blob store.
+ * The goal of this class is to populate the local data store cache with the
non-inlined blobs that are required by the
+ * indexer, so that when the indexing thread tries to retrieve the blob, it
will find it locally, thereby avoiding an
+ * expensive call to the blob store. When indexing repositories with many
non-inlined renditions, pre-populating the
+ * cache can cut the indexing time by more than half.
+ * <p>
+ * This AOT download is intended to run asynchronously with the indexing
thread. It starts the following threads:
+ * <ul>
+ * <li>[scanner] - scans the FFS, searching for blobs to download. A blob is
selected for download if it is a binary property
+ * in a node whose name matches the suffix given as parameter to this class,
and is non-inlined.</li>
+ * <li>[downloader-n] - a configurable number of threads that download the
blobs that were discovered by the scanner thread.</li>
+ * </ul>
+ * The indexer should periodically call {@link #updateIndexed(long)} to inform
the AOT downlaoder of the last line
+ * indexed. This is necessary to keep the AOT downloader more or less in sync
with the indexer, that is, to prevent it
+ * from falling behind and to prevent it from going to far ahead.
+ * <p>
+ * This AOT downloader should be configured with enough threads that it is
able to stay ahead of the indexer. Whether it
+ * can remain ahead or not, will depend on the number of blobs to download and
the speed of the connection to the blob
+ * store. As a rough guide, on a cloud environment with blob stored in Azure
Blob Store or Amazon S3, 4 download threads
+ * should be enough. If the AOT downloader falls behind the indexer, it will
skip any nodes that are behind the last known
+ * indexing position, to try to catchup.
+ * <p>
+ * The AOT downlaoder will also try not to be too far ahead of the indexer.
This is done to avoid filling up the local
+ * blob store cache, which would cause blobs to be evicted before the indexer
gets around to use them. In this case, the
+ * indexer would have to download again the blob, which would negate the
benefits of using this AOT downloader.
+ * The AOT downlaoder takes as parameter the maximum amount of data that it is
allowed to prefetch (<code>maxPrefetchWindowMB</code>).
+ * It will them try to not download more than this data, pausing its progress
whenever the prefect window is full.
+ * For details on how this implemented, see {@link
AheadOfTimeBlobDownloaderThrottler}.
+ */
+public class DefaultAheadOfTimeBlobDownloader implements
AheadOfTimeBlobDownloader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultAheadOfTimeBlobDownloader.class);
+ // Stops the downloader threads.
+ private static final Blob SENTINEL = new BlobStoreBlob(null, null);
+ private final static AtomicInteger threadNameCounter = new
AtomicInteger(0);
+
+ // Set with the ids of blobs that were already enqueued for download.
Avoids creating a large number of download
+ // requests for the blobs that are referenced by many nodes. Although this
is a HashMap it is used as a set. The
+ // value is just a placeholder, we use Boolean.TRUE for no particular
reason.
+ private static final int DOWNLOADED_BLOB_IDS_CACHE_SIZE = 4096;
+ private final LinkedHashMap<String, Boolean> downloadedBlobs = new
LinkedHashMap<>(DOWNLOADED_BLOB_IDS_CACHE_SIZE, 0.75f, true) {
+ // Avoid resizing operations
+ private final static int MAX_ENTRIES = (int)
(DOWNLOADED_BLOB_IDS_CACHE_SIZE * 0.70);
+
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > MAX_ENTRIES;
+ }
+ };
+
+ private final String binaryBlobsPathSuffix;
+ private final File ffsPath;
+ private final Compression algorithm;
+ private final GarbageCollectableBlobStore blobStore;
+ private final CompositeIndexer indexer;
+
+ // Statistics
+ private final LongAdder totalBytesDownloaded = new LongAdder();
+ private final LongAdder totalTimeDownloadingNanos = new LongAdder();
+ private final LongAdder totalBlobsDownloaded = new LongAdder();
+ private long blobsEnqueuedForDownload = 0;
+ private long skippedLinesDueToLaggingIndexing = 0;
+
+ // Download threads plus thread that scans the FFS
+ private ExecutorService executor;
+ private ScanTask scanTask;
+ private Future<?> scanFuture;
+ private ArrayList<Future<?>> downloadFutures;
+ private final int nDownloadThreads;
+ private final AheadOfTimeBlobDownloaderThrottler throttler;
+ private volatile long indexerLastKnownPosition;
+
+ /**
+ * @param binaryBlobsPathSuffix Suffix of nodes that are to be considered
for AOT download. Any node that does not match this suffix is ignored.
+ * @param ffsPath Flat file store path.
+ * @param algorithm Compression algorithm of the flat file
store.
+ * @param blobStore The blob store. This should be the same
blob store used by the indexer and its cache should be
+ * large enough to hold
<code>maxPrefetchWindowMB</code> of data.
+ * @param indexer The indexer, needed to check if a given
path should be indexed.
+ * @param nDownloadThreads Number of download threads.
+ * @param maxPrefetchWindowMB Size of the prefetch window, that is, how
much data the downlaoder will retrieve ahead of the indexer.
+ */
+ public DefaultAheadOfTimeBlobDownloader(@NotNull String
binaryBlobsPathSuffix,
+ @NotNull File ffsPath, @NotNull
Compression algorithm,
+ @NotNull
GarbageCollectableBlobStore blobStore,
+ @NotNull CompositeIndexer indexer,
+ int nDownloadThreads, int
maxPrefetchWindowSize, int maxPrefetchWindowMB) {
+ if (nDownloadThreads < 1) {
+ throw new IllegalArgumentException("nDownloadThreads must be
greater than 0. Was: " + nDownloadThreads);
+ }
+ if (maxPrefetchWindowMB < 1) {
+ throw new IllegalArgumentException("maxPrefetchWindowMB must be
greater than 0. Was: " + maxPrefetchWindowMB);
+ }
+ this.binaryBlobsPathSuffix = binaryBlobsPathSuffix;
+ this.ffsPath = ffsPath;
+ this.algorithm = algorithm;
+ this.blobStore = blobStore;
+ this.indexer = indexer;
+ this.nDownloadThreads = nDownloadThreads;
+ this.throttler = new
AheadOfTimeBlobDownloaderThrottler(maxPrefetchWindowSize, maxPrefetchWindowMB *
FileUtils.ONE_MB);
+ LOG.info("Created AheadOfTimeBlobDownloader. downloadThreads: {},
prefetchMB: {}", nDownloadThreads, maxPrefetchWindowMB);
+ }
+
+ public void start() {
+ executor = Executors.newFixedThreadPool(nDownloadThreads + 1);
+ ArrayBlockingQueue<Blob> queue = new
ArrayBlockingQueue<>(nDownloadThreads * 2);
+
+ downloadFutures = new ArrayList<>();
+ for (int i = 0; i < nDownloadThreads; i++) {
+ DownloadTask downloadTask = new DownloadTask(queue);
+ downloadFutures.add(executor.submit(downloadTask));
+ }
+ scanTask = new ScanTask(queue);
+ scanFuture = executor.submit(scanTask);
+ }
+
+ public void updateIndexed(long positionIndexed) {
+ this.indexerLastKnownPosition = positionIndexed;
+ throttler.advanceIndexer(positionIndexed);
+ }
+
+ public void close() {
+ stop();
+ }
+
+ public void stop() {
+ if (executor == null) {
+ return;
+ }
+ LOG.info("Stopping AheadOfTimeBlobDownloader. Statistics: {}",
formatAggregateStatistics());
+ scanFuture.cancel(true);
+ for (Future<?> downloadFuture : downloadFutures) {
+ downloadFuture.cancel(true);
+ }
+ LOG.info("Waiting for download tasks to finish");
+ new ExecutorCloser(executor).close();
+ executor = null;
+ LOG.info("All download tasks finished");
+ }
+
+ public String formatAggregateStatistics() {
+ long totalBytesDownloadedSum = totalBytesDownloaded.sum();
+ return String.format(
+ "Downloaded %d blobs, %d bytes (%s). aggregatedDownloadTime:
%s, cacheHits: %d, linesScanned: %d, " +
+ "notIncludedInIndex: %d, doesNotMatchPattern: %d,
inlinedBlobsSkipped: %d, " +
+ "skippedForOtherReasons: %d,
skippedLinesDueToLaggingIndexing: %d",
+ totalBlobsDownloaded.sum(), totalBytesDownloadedSum,
IOUtils.humanReadableByteCountBin(totalBytesDownloadedSum),
+
FormattingUtils.formatNanosToSeconds(totalTimeDownloadingNanos.sum()),
+ scanTask.blobCacheHit, scanTask.linesScanned,
scanTask.notIncludedInIndex, scanTask.doesNotMatchPattern,
+ scanTask.inlinedBlobsSkipped, scanTask.skippedForOtherReasons,
skippedLinesDueToLaggingIndexing);
+ }
+
+ /**
+ * Scans the FFS, searching for binary properties that are not inlined and
enqueues them for download.
+ */
+ private class ScanTask implements Runnable {
+ private final NodeStateEntryReader nodeStateEntryReader = new
NodeStateEntryReader(blobStore);
+ private final ArrayBlockingQueue<Blob> queue;
+
+ long linesScanned = 0;
+ long blobCacheHit = 0;
+ long notIncludedInIndex = 0;
+ long doesNotMatchPattern = 0;
+ long inlinedBlobsSkipped = 0;
+ long skippedForOtherReasons = 0;
+
+ public ScanTask(ArrayBlockingQueue<Blob> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try (LineIterator ffsLineIterator = new
LineIterator(IndexStoreUtils.createReader(ffsPath, algorithm))) {
+ String oldName = Thread.currentThread().getName();
+ Thread.currentThread().setName("scanner");
+ try {
+ while (ffsLineIterator.hasNext()) {
+ String ffsLine = ffsLineIterator.next();
+ // Do not parse the json with the node state yet,
first check if the path is a possible candidate
+ // for download. Most of the lines in the FFS will be
of lines that do not contain blobs to download,
+ // so it would be wasteful to parse the json for all
of them.
+ int pipeIndex =
ffsLine.indexOf(NodeStateEntryWriter.DELIMITER_CHAR);
+ String entryPath = ffsLine.substring(0, pipeIndex);
+ if (!isCandidatePath(entryPath)) {
+ doesNotMatchPattern++;
+ } else if (!indexer.shouldInclude(entryPath)) {
+ notIncludedInIndex++;
+ } else if (isBehindIndexer(linesScanned)) {
+ LOG.debug("Skipping blob at position {} because it
was already indexed", linesScanned);
+ skippedLinesDueToLaggingIndexing++;
+ } else {
+ // Now we need to parse the json to check if there
are any blobs to download
+ processEntry(entryPath,
ffsLine.substring(pipeIndex + 1));
+ }
+ linesScanned++;
+ if (linesScanned % 100_000 == 0) {
+ LOG.info("[{}] Last path scanned: {}. Aggregated
statistics: {}", linesScanned, entryPath, formatAggregateStatistics());
+ }
+ }
+ } catch (InterruptedException e) {
+ queue.clear();
+ LOG.info("Scan task interrupted, exiting");
+ } finally {
+ LOG.info("Scanner reached end of FFS, stopping download
threads. Statistics: {}", formatAggregateStatistics());
+ Thread.currentThread().setName(oldName);
+ queue.put(SENTINEL);
+ }
+ } catch (InterruptedException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isCandidatePath(String path) {
+ return path.endsWith(binaryBlobsPathSuffix);
+ }
+
+ private boolean isBehindIndexer(long scannerPosition) {
+ // Always try to be ahead of the indexer
+ return scannerPosition <= indexerLastKnownPosition;
+ }
+
+ private void processEntry(String entryPath, String entryStateAsJson)
throws InterruptedException {
+ NodeState nodeState =
nodeStateEntryReader.parseState(entryStateAsJson);
+ PropertyState ps = nodeState.getProperty("jcr:data");
+ if (ps == null || ps.isArray() || ps.getType() != Type.BINARY) {
+ skippedForOtherReasons++;
+ LOG.info("Skipping node: {}. Property \"jcr:data\": {}",
entryPath, ps);
+ return;
+ }
+ for (Blob blob : ps.getValue(Type.BINARIES)) {
+ if (blob.isInlined()) {
+ inlinedBlobsSkipped++;
+ continue;
+ }
+ if (blob.getContentIdentity() == null) {
+ LOG.info("[{}] Skipping blob with null content identity:
{}", linesScanned, blob.getContentIdentity());
+ continue;
+ }
+ // Check if we have recently downloaded this blob. This is
just an optimization. Without this cache,
+ // if a blob had been downloaded recently, further attempts to
download it would hit the blob store cache
+ // and complete quickly. But as it is common for the same blob
to be referenced by many entries, this simple
+ // check here avoids the extra work of enqueuing the blob for
download and reading it from the cache.
+ boolean present =
downloadedBlobs.containsKey(blob.getContentIdentity());
+ if (present) {
+ blobCacheHit++;
+ LOG.debug("[{}] Blob already downloaded or enqueued for
download: {}", linesScanned, blob.getContentIdentity());
+ continue;
+ }
+ throttler.reserveSpaceForBlob(linesScanned, blob.length());
+ downloadedBlobs.put(blob.getContentIdentity(), Boolean.TRUE);
+ queue.put(blob);
+ blobsEnqueuedForDownload++;
+ // Log progress
+ if (blobsEnqueuedForDownload % 1000 == 0) {
+ LOG.info("[{}] Enqueued blob for download: {}, size: {},
Statistics: {}, {}",
+ linesScanned, blob.getContentIdentity(),
blob.length(),
+ formatAggregateStatistics(),
throttler.formatStats());
+ }
+ }
+ }
+ }
+
+ /**
+ * Downloads blobs from the blob store.
+ */
+ private class DownloadTask implements Runnable {
+ private final ArrayBlockingQueue<Blob> queue;
+
+ private long blobsDownloaded = 0;
+ private long bytesDownloaded = 0;
+ private long timeDownloadingNanos = 0;
+
+ /**
+ * @param queue The queue from which to take blobs to download.
+ */
+ public DownloadTask(ArrayBlockingQueue<Blob> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ String oldName = Thread.currentThread().getName();
+ Thread.currentThread().setName("downloader-" +
threadNameCounter.getAndIncrement());
+ byte[] buffer = new byte[4096];
+ try {
+ while (true) {
+ Blob blob = queue.take();
+ if (blob == SENTINEL) {
+ LOG.info("Sentinel received, exiting. Statistics: {}",
formatDownloaderStats());
+ queue.put(SENTINEL);
+ break;
+ }
+
+ long startNanos = System.nanoTime();
+ InputStream stream = blob.getNewStream();
+ int blobSize = 0;
+ try {
+ while (true) {
+ int bytesRead = stream.read(buffer);
+ if (bytesRead == -1) {
+ break;
+ }
+ blobSize += bytesRead;
+ }
+ if (blobSize != blob.length()) {
+ LOG.error("Blob size mismatch: blob.length(): {},
bytesRead: {}", blob.length(), blobSize);
+ }
+ long elapsedNanos = System.nanoTime() - startNanos;
+ // Local stats
+ bytesDownloaded += blobSize;
+ blobsDownloaded++;
+ timeDownloadingNanos += elapsedNanos;
+ // Aggregated stats across all download threads.
+ totalBytesDownloaded.add(blobSize);
+ totalTimeDownloadingNanos.add(elapsedNanos);
+ totalBlobsDownloaded.increment();
+ // Log progress
+ if (blobsDownloaded % 500 == 0) {
+ LOG.info("Retrieved blob: {}, size: {}, in {} ms.
Downloader thread statistics: {}",
+ blob.getContentIdentity(), blob.length(),
elapsedNanos / 1_000_000, formatDownloaderStats());
+ }
+ } catch (IOException e) {
+ LOG.error("Error downloading blob: {}",
blob.getContentIdentity(), e);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Download task interrupted, exiting. Statistics: {}",
formatDownloaderStats());
+ } finally {
+ Thread.currentThread().setName(oldName);
+ }
+ }
+
+ private String formatDownloaderStats() {
+ return String.format("Downloaded %d blobs, %d bytes (%s) in %s",
+ blobsDownloaded, bytesDownloaded,
IOUtils.humanReadableByteCountBin(bytesDownloaded),
+
FormattingUtils.formatNanosToSeconds(timeDownloadingNanos));
+ }
+ }
+}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
index 294d8f3117..949253ba46 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
@@ -27,8 +27,10 @@ import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.index.IndexerSupport;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
+import org.apache.jackrabbit.oak.index.indexer.document.CompositeIndexer;
import
org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategy;
import
org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
@@ -106,6 +108,7 @@ public class FlatFileNodeStoreBuilder {
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
private IndexingReporter indexingReporter = IndexingReporter.NOOP;
private MongoClientURI mongoClientURI;
+ private boolean withAheadOfTimeBlobDownloading = false;
public enum SortStrategyType {
/**
@@ -199,7 +202,24 @@ public class FlatFileNodeStoreBuilder {
return this;
}
- public FlatFileStore build() throws IOException, CompositeException {
+ /**
+ * NOTE: This enables the support for AOT blob download, it does not turn
activate it. To activate it, set the property
+ *
AheadOfTimeBlobDownloadingFlatFileStore.OAK_INDEXER_BLOB_PREFETCH_BINARY_NODES_SUFFIX
to a non-empty value AND
+ * enable the support here.
+ *
+ * @param aotSupportEnabled
+ * @return
+ */
+ public FlatFileNodeStoreBuilder withAheadOfTimeBlobDownloader(boolean
aotSupportEnabled) {
+ this.withAheadOfTimeBlobDownloading = aotSupportEnabled;
+ return this;
+ }
+
+ public IndexStore build() throws IOException, CompositeException {
+ return build(null, null);
+ }
+
+ public IndexStore build(IndexHelper indexHelper, CompositeIndexer indexer)
throws IOException, CompositeException {
logFlags();
entryWriter = new NodeStateEntryWriter(blobStore);
IndexStoreFiles indexStoreFiles = createdSortedStoreFiles();
@@ -210,10 +230,17 @@ public class FlatFileNodeStoreBuilder {
if (entryCount > 0) {
store.setEntryCount(entryCount);
}
- return store;
+ if (indexer == null || indexHelper == null) {
+ return store;
+ }
+ if (withAheadOfTimeBlobDownloading) {
+ return AheadOfTimeBlobDownloadingFlatFileStore.wrap(store,
indexer, indexHelper);
+ } else {
+ return store;
+ }
}
- public List<FlatFileStore> buildList(IndexHelper indexHelper,
IndexerSupport indexerSupport,
+ public List<IndexStore> buildList(IndexHelper indexHelper, IndexerSupport
indexerSupport,
Set<IndexDefinition>
indexDefinitions) throws IOException, CompositeException {
logFlags();
entryWriter = new NodeStateEntryWriter(blobStore);
@@ -233,7 +260,7 @@ public class FlatFileNodeStoreBuilder {
log.info("Split flat file to result files '{}' is done, took {}
ms", fileList, System.currentTimeMillis() - start);
}
- List<FlatFileStore> storeList = new ArrayList<>();
+ List<IndexStore> storeList = new ArrayList<>();
for (File flatFileItem : fileList) {
FlatFileStore store = new FlatFileStore(blobStore, flatFileItem,
metadataFile, new NodeStateEntryReader(blobStore),
unmodifiableSet(preferredPathElements), algorithm);
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
index 1e260f4c1d..9650cd1fef 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStore.java
@@ -68,6 +68,10 @@ public class FlatFileStore implements IndexStore {
return storeFile.getParentFile().getAbsolutePath();
}
+ public File getStoreFile() {
+ return storeFile;
+ }
+
/**
*
* @deprecated use {@link #getStorePath()} instead
@@ -132,4 +136,8 @@ public class FlatFileStore implements IndexStore {
public boolean isIncremental() {
return false;
}
+
+ public Compression getAlgorithm() {
+ return algorithm;
+ }
}
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
index 96732d4e96..32a07ccf19 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/NodeStateEntryReader.java
@@ -43,7 +43,7 @@ public class NodeStateEntryReader {
return new NodeStateEntry(nodeState, parts[0], memUsage, 0, "");
}
- protected NodeState parseState(String part) {
+ public NodeState parseState(String part) {
return des.deserialize(part);
}
}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/AheadOfTimeBlobDownloaderThrottlerTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/AheadOfTimeBlobDownloaderThrottlerTest.java
new file mode 100644
index 0000000000..ad55c7205d
--- /dev/null
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/AheadOfTimeBlobDownloaderThrottlerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.AheadOfTimeBlobDownloaderThrottler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.*;
+
+public class AheadOfTimeBlobDownloaderThrottlerTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AheadOfTimeBlobDownloaderThrottlerTest.class);
+
+ @Test
+ public void blockOnWindowFullByteSize() throws ExecutionException,
InterruptedException, TimeoutException {
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ int maxWindow = 10;
+ int maxWindowSizeBytes = 500;
+ try {
+ AheadOfTimeBlobDownloaderThrottler throttler = new
AheadOfTimeBlobDownloaderThrottler(maxWindow, maxWindowSizeBytes);
+ assertTrue(throttler.reserveSpaceForBlob(0, 100));
+ assertEquals(maxWindowSizeBytes - 100,
throttler.getAvailableWindowBytes());
+ assertEquals(maxWindow - 1, throttler.getAvailableWindowSize());
+
+ assertTrue(throttler.reserveSpaceForBlob(4, 300));
+ assertEquals(maxWindowSizeBytes - 400,
throttler.getAvailableWindowBytes());
+ assertEquals(maxWindow - 2, throttler.getAvailableWindowSize());
+
+ assertTrue(throttler.reserveSpaceForBlob(5, 100));
+ assertEquals(0, throttler.getAvailableWindowBytes());
+ assertEquals(maxWindow - 3, throttler.getAvailableWindowSize());
+
+ // The prefetch window is full, so the next call to
reserveSpaceForBlob should block. Do the call in a separate thread.
+ AtomicBoolean spaceWasReservedAfterWait = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ Future<?> f = executorService.submit((() -> {
+ latch.countDown();
+ try {
+ // This should block until the prefetch window is advanced
+ assertTrue(throttler.reserveSpaceForBlob(6, 10));
+ spaceWasReservedAfterWait.set(true);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ latch.await();
+ // The thread above has started. Wait for a bit to make sure it's
blocked
+ Thread.sleep(50);
+ // The worker thread cannot reserve space until the main thread
advances the indexer
+ assertFalse(spaceWasReservedAfterWait.get());
+ // This should release the thread above
+ throttler.advanceIndexer(0);
+ f.get(100, TimeUnit.MILLISECONDS);
+ // The thread should have advanced
+ assertTrue(spaceWasReservedAfterWait.get());
+ // Advance to the end
+ throttler.advanceIndexer(6);
+ // Check that the AOT download window is empty
+ assertEquals(maxWindowSizeBytes,
throttler.getAvailableWindowBytes());
+ assertEquals(maxWindow, throttler.getAvailableWindowSize());
+ } finally {
+ new ExecutorCloser(executorService).close();
+ }
+ }
+
+ @Test
+ public void blockOnWindowFullCapacity() throws ExecutionException,
InterruptedException, TimeoutException {
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ int maxWindow = 10;
+ int maxWindowSizeBytes = 500;
+ try {
+ AheadOfTimeBlobDownloaderThrottler throttler = new
AheadOfTimeBlobDownloaderThrottler(maxWindow, maxWindowSizeBytes);
+ // Fill the prefetch window
+ for (int i = 0; i < maxWindow; i++) {
+ assertTrue(throttler.reserveSpaceForBlob(i, 10));
+ }
+ assertEquals(maxWindowSizeBytes - 100,
throttler.getAvailableWindowBytes());
+ assertEquals(0, throttler.getAvailableWindowSize());
+ AtomicBoolean spaceWasReservedAfterWait = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ Future<?> f = executorService.submit((() -> {
+ latch.countDown();
+ try {
+ // This should block until the prefetch window is advanced
+ assertTrue(throttler.reserveSpaceForBlob(11, 10));
+ spaceWasReservedAfterWait.set(true);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ latch.await();
+ // The thread above has started. Wait for a bit to make sure it's
blocked
+ Thread.sleep(50);
+ assertFalse(spaceWasReservedAfterWait.get());
+ // Advance one element, this should allow the worker thread to
reserve space
+ throttler.advanceIndexer(1);
+ f.get(100, TimeUnit.MILLISECONDS);
+ assertTrue(spaceWasReservedAfterWait.get());
+ // Advance to the end
+ throttler.advanceIndexer(11);
+ // Check that the AOT download window is empty
+ assertEquals(maxWindowSizeBytes,
throttler.getAvailableWindowBytes());
+ assertEquals(maxWindow, throttler.getAvailableWindowSize());
+ } finally {
+ new ExecutorCloser(executorService).close();
+ }
+ }
+
+ @Test
+ public void spaceReservationForPositionBehindIndexerIsIgnored() throws
InterruptedException {
+ AheadOfTimeBlobDownloaderThrottler throttler = new
AheadOfTimeBlobDownloaderThrottler(10, 100);
+ throttler.advanceIndexer(5);
+ assertFalse(throttler.reserveSpaceForBlob(0, 10));
+ assertFalse(throttler.reserveSpaceForBlob(1, 10));
+ assertFalse(throttler.reserveSpaceForBlob(5, 10));
+ assertEquals(100, throttler.getAvailableWindowBytes());
+ assertEquals(10, throttler.getAvailableWindowSize());
+ assertTrue(throttler.reserveSpaceForBlob(6, 10));
+ assertEquals(90, throttler.getAvailableWindowBytes());
+ assertEquals(9, throttler.getAvailableWindowSize());
+ }
+
+ @Test
+ public void manyReservations() throws InterruptedException,
ExecutionException {
+ AheadOfTimeBlobDownloaderThrottler throttler = new
AheadOfTimeBlobDownloaderThrottler(1024, 64 * 1024);
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ try {
+ CountDownLatch latch = new CountDownLatch(1);
+ Future<?> future = executorService.submit(() -> {
+ Random random = new Random();
+ latch.countDown();
+ for (int i = 0; i < 500; i++) {
+ try {
+ throttler.reserveSpaceForBlob(i, 512 +
random.nextInt(512));
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ fail("Should not have thrown an exception");
+ }
+ }
+ });
+
+ latch.await();
+ for (int i = 0; i < 500; i++) {
+ throttler.advanceIndexer(i);
+ Thread.sleep(1);
+ }
+
+ future.get();
+ LOG.info("Stats: {}", throttler.formatStats());
+ } finally {
+ new ExecutorCloser(executorService).close();
+ }
+ }
+}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStoreTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStoreTest.java
new file mode 100644
index 0000000000..a630f05ae2
--- /dev/null
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/AheadOfTimeBlobDownloadingFlatFileStoreTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class AheadOfTimeBlobDownloadingFlatFileStoreTest {
+
+ @Test
+ public void isEnabledForIndexes() {
+
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "",
+ List.of("/oak:index/fooA-34")
+ ));
+
+ assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "/oak:index/foo",
+ List.of("/oak:index/fooA-34")
+ ));
+
+ assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "/oak:index/foo",
+ List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")
+ ));
+
+
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "/oak:index/foo",
+ List.of("/oak:index/anotherIndex")
+ ));
+
+ assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "/oak:index/fooA-,/oak:index/fooB-",
+ List.of("/oak:index/fooA-34")
+ ));
+
+ assertTrue(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "/oak:index/fooA-, /oak:index/fooB-",
+ List.of("/oak:index/anotherIndex", "/oak:index/fooA-34")
+ ));
+
+
assertFalse(AheadOfTimeBlobDownloadingFlatFileStore.isEnabledForIndexes(
+ "/oak:index/fooA-",
+ List.of()
+ ));
+ }
+}
\ No newline at end of file
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
index c15c29003e..679fcc62db 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilderTest.java
@@ -36,6 +36,7 @@ import
org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.IndexerConfiguration;
import
org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
@@ -177,8 +178,8 @@ public class FlatFileNodeStoreBuilderTest {
public void assertBuild(String dir) throws CompositeException, IOException
{
FlatFileNodeStoreBuilder builder = new
FlatFileNodeStoreBuilder(folder.getRoot()).withNodeStateEntryTraverserFactory(
nodeStateEntryTraverserFactory);
- try (FlatFileStore store = builder.build()) {
- assertEquals(dir, store.getFlatFileStorePath());
+ try (IndexStore store = builder.build()) {
+ assertEquals(dir, store.getStorePath());
}
}
@@ -201,13 +202,13 @@ public class FlatFileNodeStoreBuilderTest {
NodeState rootState = mock(NodeState.class);
when(indexerSupport.retrieveNodeStateForCheckpoint()).thenReturn(rootState);
- List<FlatFileStore> storeList = builder.buildList(indexHelper,
indexerSupport, mockIndexDefns());
+ List<IndexStore> storeList = builder.buildList(indexHelper,
indexerSupport, mockIndexDefns());
if (split) {
- assertEquals(new File(dir, "split").getAbsolutePath(),
storeList.get(0).getFlatFileStorePath());
+ assertEquals(new File(dir, "split").getAbsolutePath(),
storeList.get(0).getStorePath());
assertTrue(storeList.size() > 1);
} else {
- assertEquals(dir, storeList.get(0).getFlatFileStorePath());
+ assertEquals(dir, storeList.get(0).getStorePath());
assertEquals(1, storeList.size());
}
}
diff --git
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
index fabe1273a8..1688dc849c 100644
---
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
+++
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileStoreTest.java
@@ -21,6 +21,7 @@ package
org.apache.jackrabbit.oak.index.indexer.document.flatfile;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
import
org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverser;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.jetbrains.annotations.NotNull;
import org.junit.Rule;
@@ -55,7 +56,7 @@ public class FlatFileStoreTest {
private void runBasicTest() throws Exception {
List<String> paths = createTestPaths();
FlatFileNodeStoreBuilder spyBuilder = Mockito.spy(new
FlatFileNodeStoreBuilder(folder.getRoot()));
- FlatFileStore flatStore = spyBuilder.withBlobStore(new
MemoryBlobStore())
+ IndexStore flatStore = spyBuilder.withBlobStore(new MemoryBlobStore())
.withPreferredPathElements(preferred)
.withPathPredicate(pathPredicate)
.withNodeStateEntryTraverserFactory(range -> new
NodeStateEntryTraverser("NS-1", null, null,null, range) {
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 8a2cbd521a..e8818bcd69 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -31,6 +31,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.index.async.AsyncIndexerLucene;
import org.apache.jackrabbit.oak.index.indexer.document.DocumentStoreIndexer;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStore;
+import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
import org.apache.jackrabbit.oak.run.cli.CommonOptions;
import org.apache.jackrabbit.oak.run.cli.DocumentBuilderCustomizer;
@@ -173,7 +174,7 @@ public class IndexCommand implements Command {
}
}
- private void execute(NodeStoreFixture fixture, IndexOptions indexOpts,
Closer closer)
+ private void execute(NodeStoreFixture fixture, IndexOptions indexOpts,
Closer closer)
throws IOException, CommitFailedException {
ExtendedIndexHelper extendedIndexHelper = createIndexHelper(fixture,
indexOpts, closer);
@@ -184,13 +185,13 @@ public class IndexCommand implements Command {
reindexOperation(indexOpts, extendedIndexHelper);
importIndexOperation(indexOpts, extendedIndexHelper);
- log.info("[INDEXING_REPORT:INDEX_UPLOAD]\n{}" ,
extendedIndexHelper.getIndexReporter().generateReport());
+ log.info("[INDEXING_REPORT:INDEX_UPLOAD]\n{}",
extendedIndexHelper.getIndexReporter().generateReport());
}
private ExtendedIndexHelper createIndexHelper(NodeStoreFixture fixture,
- IndexOptions indexOpts, Closer
closer) throws IOException {
+ IndexOptions indexOpts,
Closer closer) throws IOException {
ExtendedIndexHelper extendedIndexHelper = new
ExtendedIndexHelper(fixture.getStore(), fixture.getBlobStore(),
fixture.getWhiteboard(),
- indexOpts.getOutDir(), indexOpts.getWorkDir(),
computeIndexPaths(indexOpts));
+ indexOpts.getOutDir(), indexOpts.getWorkDir(),
computeIndexPaths(indexOpts));
configurePreExtractionSupport(indexOpts, extendedIndexHelper);
@@ -206,7 +207,7 @@ public class IndexCommand implements Command {
IndexDefinitionUpdater updater = new
IndexDefinitionUpdater(definitions);
Set<String> indexPathsFromJson = updater.getIndexPaths();
Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
- if (!diff.isEmpty()){
+ if (!diff.isEmpty()) {
log.info("Augmenting the indexPaths with {} which are present
in {}", diff, definitions);
}
indexPaths.addAll(indexPathsFromJson);
@@ -223,7 +224,7 @@ public class IndexCommand implements Command {
}
private void reindexOperation(IndexOptions indexOpts, ExtendedIndexHelper
extendedIndexHelper) throws IOException, CommitFailedException {
- if (!indexOpts.isReindex()){
+ if (!indexOpts.isReindex()) {
return;
}
@@ -252,9 +253,14 @@ public class IndexCommand implements Command {
log.info("Using Document order traversal to perform reindexing");
try (DocumentStoreIndexer indexer = new
DocumentStoreIndexer(extendedIndexHelper, indexerSupport)) {
if (idxOpts.buildFlatFileStoreSeparately()) {
- FlatFileStore ffs = indexer.buildFlatFileStore();
- String pathToFFS = ffs.getFlatFileStorePath();
- System.setProperty(OAK_INDEXER_SORTED_FILE_PATH,
pathToFFS);
+ IndexStore indexStore = indexer.buildFlatFileStore();
+ if (indexStore instanceof FlatFileStore) {
+ FlatFileStore ffs = (FlatFileStore) indexStore;
+ String pathToFFS = ffs.getFlatFileStorePath();
+ System.setProperty(OAK_INDEXER_SORTED_FILE_PATH,
pathToFFS);
+ } else {
+ throw new IllegalArgumentException("Store is not
FlatFileStore, cannot cannot use option to build flat file store separately.");
+ }
}
indexer.reindex();
}
@@ -305,7 +311,7 @@ public class IndexCommand implements Command {
private String connectInReadWriteModeAndCreateCheckPoint(IndexOptions
indexOpts) throws Exception {
String checkpoint = indexOpts.getCheckpoint();
- if (checkpoint != null){
+ if (checkpoint != null) {
log.info("Using provided checkpoint [{}]", checkpoint);
return checkpoint;
}
@@ -419,7 +425,7 @@ public class IndexCommand implements Command {
}
private static void configureCustomizer(Options opts, Closer closer,
boolean readOnlyAccess) {
- if (opts.getCommonOpts().isDocument()){
+ if (opts.getCommonOpts().isDocument()) {
IndexOptions indexOpts = opts.getOptionBean(IndexOptions.class);
if (indexOpts.isReindex()) {
IndexDocumentBuilderCustomizer customizer = new
IndexDocumentBuilderCustomizer(opts, readOnlyAccess);