This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0601c69cbf OAK-11655 - Do not create redundant instances of
ElasticBulkProcessorHandler (#2225)
0601c69cbf is described below
commit 0601c69cbf2f7a97b1571fd3fd69bc463af1437e
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Apr 16 11:32:35 2025 +0200
OAK-11655 - Do not create redundant instances of
ElasticBulkProcessorHandler (#2225)
---
.../indexer/document/DocumentStoreIndexerBase.java | 32 ++++++++++------------
.../oak/index/ElasticDocumentStoreIndexer.java | 24 ++++++++--------
.../indexer/document/ElasticIndexerProvider.java | 10 +++----
.../indexer/document/DocumentStoreIndexer.java | 13 ++++-----
.../elastic/index/ElasticBulkProcessorHandler.java | 16 +++++------
.../elastic/index/ElasticIndexEditorProvider.java | 9 +++++-
6 files changed, 50 insertions(+), 54 deletions(-)
diff --git
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 27e52cf327..b17c84640d 100644
---
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -29,7 +29,6 @@ import org.apache.jackrabbit.oak.commons.time.Stopwatch;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.index.IndexerSupport;
import
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder;
-import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
import
org.apache.jackrabbit.oak.index.indexer.document.incrementalstore.IncrementalStoreBuilder;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
import
org.apache.jackrabbit.oak.index.indexer.document.tree.ParallelIndexStore;
@@ -39,13 +38,14 @@ import
org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
-import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
import
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
@@ -104,7 +104,7 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
protected final IndexHelper indexHelper;
private final IndexingReporter indexingReporter;
private final StatisticsProvider statisticsProvider;
- protected List<NodeStateIndexerProvider> indexerProviders;
+ protected NodeStateIndexerProvider indexerProvider;
protected final IndexerSupport indexerSupport;
private static final int MAX_DOWNLOAD_ATTEMPTS =
Integer.parseInt(System.getProperty("oak.indexer.maxDownloadRetries", "5")) + 1;
@@ -118,8 +118,8 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
this.statisticsProvider = indexHelper.getStatisticsProvider();
}
- protected void setProviders() throws IOException {
- this.indexerProviders = createProviders();
+ protected void setProvider() throws IOException {
+ this.indexerProvider = createProvider();
}
private static class MongoNodeStateEntryTraverserFactory implements
NodeStateEntryTraverserFactory {
@@ -410,12 +410,10 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
log.info("Top slowest nodes to index (ms): {}",
slowestTopKElements);
}
}
- for (NodeStateIndexerProvider indexerProvider :
indexerProviders) {
- ExtractedTextCache extractedTextCache =
indexerProvider.getTextCache();
- CacheStats cacheStats = extractedTextCache == null ? null
: extractedTextCache.getCacheStats();
- log.info("Text extraction cache statistics: {}",
cacheStats == null ? "N/A" : cacheStats.cacheInfoAsString());
- indexerProvider.close();
- }
+ ExtractedTextCache extractedTextCache =
indexerProvider.getTextCache();
+ CacheStats cacheStats = extractedTextCache == null ? null :
extractedTextCache.getCacheStats();
+ log.info("Text extraction cache statistics: {}", cacheStats ==
null ? "N/A" : cacheStats.cacheInfoAsString());
+ indexerProvider.close();
progressReporter.reindexingTraversalEnd();
progressReporter.logReport();
@@ -538,19 +536,17 @@ public abstract class DocumentStoreIndexerBase implements
Closeable {
idxBuilder.setProperty(IndexConstants.REINDEX_PROPERTY_NAME,
false);
- for (NodeStateIndexerProvider indexerProvider : indexerProviders) {
- NodeStateIndexer indexer = indexerProvider.getIndexer(type,
indexPath, idxBuilder, root, progressReporter);
- if (indexer != null) {
- indexers.add(indexer);
- progressReporter.registerIndex(indexPath, true, -1);
- }
+ NodeStateIndexer indexer = indexerProvider.getIndexer(type,
indexPath, idxBuilder, root, progressReporter);
+ if (indexer != null) {
+ indexers.add(indexer);
+ progressReporter.registerIndex(indexPath, true, -1);
}
}
return new CompositeIndexer(indexers);
}
- protected abstract List<NodeStateIndexerProvider> createProviders() throws
IOException;
+ protected abstract NodeStateIndexerProvider createProvider() throws
IOException;
protected abstract void preIndexOperations(List<NodeStateIndexer>
indexers);
diff --git
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
index 31cab0b44d..39c0ca2fa8 100644
---
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
+++
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
@@ -52,17 +52,15 @@ public class ElasticDocumentStoreIndexer extends
DocumentStoreIndexerBase {
this.port = port;
this.apiKeyId = apiKeyId;
this.apiSecretId = apiSecretId;
- setProviders();
+ setProvider();
}
- protected List<NodeStateIndexerProvider> createProviders() {
- List<NodeStateIndexerProvider> providers = List.of(
- createElasticIndexerProvider()
- );
-
- providers.forEach(closer::register);
- return providers;
+ protected NodeStateIndexerProvider createProvider() {
+ NodeStateIndexerProvider provider = createElasticIndexerProvider();
+ closer.register(provider);
+ return provider;
}
+
/*
Used to provision elastic index before starting indexing
Otherwise proper alias naming and mapping will not be applied
@@ -86,14 +84,14 @@ public class ElasticDocumentStoreIndexer extends
DocumentStoreIndexerBase {
host,
port
);
- final ElasticConnection coordinate;
+ final ElasticConnection connection;
if (apiKeyId != null && apiSecretId != null) {
- coordinate = buildStep.withApiKeys(apiKeyId, apiSecretId).build();
+ connection = buildStep.withApiKeys(apiKeyId, apiSecretId).build();
} else {
- coordinate = buildStep.build();
+ connection = buildStep.build();
}
- closer.register(coordinate);
- return new ElasticIndexerProvider(indexHelper, coordinate);
+ closer.register(connection);
+ return new ElasticIndexerProvider(indexHelper, connection);
}
}
diff --git
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
index 7a1e49a526..8a4f3c4659 100644
---
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
+++
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
@@ -53,13 +53,15 @@ public class ElasticIndexerProvider implements
NodeStateIndexerProvider {
private final ElasticConnection connection;
private final ElasticBulkProcessorHandler bulkProcessorHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final ElasticIndexEditorProvider elasticIndexEditorProvider;
public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection
connection) {
this.indexHelper = indexHelper;
this.connection = connection;
this.bulkProcessorHandler = new
ElasticBulkProcessorHandler(connection);
- this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
- new ElasticIndexTracker(connection, new
ElasticMetricHandler(StatisticsProvider.NOOP)), bulkProcessorHandler);
+ ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection,
new ElasticMetricHandler(StatisticsProvider.NOOP));
+ this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
indexTracker, bulkProcessorHandler);
+ this.elasticIndexEditorProvider = new
ElasticIndexEditorProvider(indexTracker, connection, null,
bulkProcessorHandler);
}
@@ -73,9 +75,6 @@ public class ElasticIndexerProvider implements
NodeStateIndexerProvider {
FulltextIndexWriter<ElasticDocument> indexWriter =
indexWriterFactory.newInstance(idxDefinition, definition, CommitInfo.EMPTY,
true);
FulltextBinaryTextExtractor textExtractor = new
FulltextBinaryTextExtractor(textCache, idxDefinition, true);
-
- ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection,
new ElasticMetricHandler(StatisticsProvider.NOOP));
- ElasticIndexEditorProvider elasticIndexEditorProvider = new
ElasticIndexEditorProvider(indexTracker, connection, null);
return new ElasticIndexer(idxDefinition, textExtractor, definition,
progressReporter, indexWriter, elasticIndexEditorProvider, indexHelper);
}
@@ -87,6 +86,7 @@ public class ElasticIndexerProvider implements
NodeStateIndexerProvider {
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
+ this.elasticIndexEditorProvider.close();
this.bulkProcessorHandler.close();
}
}
diff --git
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
index 49f836c301..607bbca49d 100644
---
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
+++
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
@@ -32,20 +32,17 @@ public class DocumentStoreIndexer extends
DocumentStoreIndexerBase implements Cl
public DocumentStoreIndexer(ExtendedIndexHelper extendedIndexHelper,
IndexerSupport indexerSupport) throws IOException {
super(extendedIndexHelper, indexerSupport);
this.extendedIndexHelper = extendedIndexHelper;
- setProviders();
+ setProvider();
}
private NodeStateIndexerProvider createLuceneIndexProvider() throws
IOException {
return new LuceneIndexerProvider(extendedIndexHelper, indexerSupport);
}
- protected List<NodeStateIndexerProvider> createProviders() throws
IOException {
- List<NodeStateIndexerProvider> providers = List.of(
- createLuceneIndexProvider()
- );
-
- providers.forEach(closer::register);
- return providers;
+ protected NodeStateIndexerProvider createProvider() throws IOException {
+ NodeStateIndexerProvider provider = createLuceneIndexProvider();
+ closer.register(provider);
+ return provider;
}
@Override
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index e843554eb9..b6d30c9892 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -127,7 +127,7 @@ public class ElasticBulkProcessorHandler {
private final int failedDocCountForStatusNode =
ConfigHelper.getSystemPropertyAsInt("oak.failedDocStatusLimit", 10000);
private final int bulkMaxOperations =
ConfigHelper.getSystemPropertyAsInt(BULK_ACTIONS_PROP, BULK_ACTIONS_DEFAULT);
private final int bulkMaxSizeBytes =
ConfigHelper.getSystemPropertyAsInt(BULK_SIZE_BYTES_PROP,
BULK_SIZE_BYTES_DEFAULT);
- private final int bulkFlushIntervalMs =
ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP,
BULK_FLUSH_INTERVAL_MS_DEFAULT);
+ private final int bulkFlushIntervalMillis =
ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP,
BULK_FLUSH_INTERVAL_MS_DEFAULT);
private final int bulkMaxConcurrentRequests =
ConfigHelper.getSystemPropertyAsInt(BULK_MAX_CONCURRENT_REQUESTS_PROP,
BULK_MAX_CONCURRENT_REQUESTS_DEFAULT);
private final boolean failOnError =
ConfigHelper.getSystemPropertyAsBoolean(FAIL_ON_ERROR_PROP,
FAIL_ON_ERROR_DEFAULT);
@@ -152,8 +152,8 @@ public class ElasticBulkProcessorHandler {
this.elasticConnection = elasticConnection;
// BulkIngester does not support retry policies. Some retries though
are already implemented in the transport layer.
// More details here:
https://github.com/elastic/elasticsearch-java/issues/478
- LOG.info("Creating bulk ingester [maxActions: {}, maxSizeBytes: {}
flushInterval {}, concurrency {}]",
- bulkMaxOperations, bulkMaxSizeBytes, bulkFlushIntervalMs,
BULK_MAX_CONCURRENT_REQUESTS_PROP);
+ LOG.info("Creating bulk ingester [maxOperations: {}, maxSizeBytes: {},
flushIntervalMillis: {}, maxConcurrentRequests: {}]",
+ bulkMaxOperations, bulkMaxSizeBytes, bulkFlushIntervalMillis,
bulkMaxConcurrentRequests);
this.bulkIngester = BulkIngester.of(b -> {
b = b.client(elasticConnection.getAsyncClient())
.listener(new OakBulkListener());
@@ -163,8 +163,8 @@ public class ElasticBulkProcessorHandler {
if (bulkMaxSizeBytes > 0) {
b = b.maxSize(bulkMaxSizeBytes);
}
- if (bulkFlushIntervalMs > 0) {
- b = b.flushInterval(bulkFlushIntervalMs,
TimeUnit.MILLISECONDS);
+ if (bulkFlushIntervalMillis > 0) {
+ b = b.flushInterval(bulkFlushIntervalMillis,
TimeUnit.MILLISECONDS);
}
if (bulkMaxConcurrentRequests > 0) {
b = b.maxConcurrentRequests(bulkMaxConcurrentRequests);
@@ -309,7 +309,7 @@ public class ElasticBulkProcessorHandler {
// we are closing. Wait until all requests lower or equal to
this number are processed.
OptionalLong lowestPendingBulkRequest =
pendingBulks.stream().mapToLong(Long::longValue).min();
// If there is no pending request, we return immediately
- long remainingTimeoutNanos =
TimeUnit.MILLISECONDS.toNanos(bulkFlushIntervalMs * 5L);
+ long remainingTimeoutNanos =
TimeUnit.MILLISECONDS.toNanos(bulkFlushIntervalMillis * 5L);
while (lowestPendingBulkRequest.isPresent() &&
lowestPendingBulkRequest.getAsLong() <= highestBulkRequestSent) {
LOG.debug("Waiting for request {} to be processed. Lowest
pending request: {}", highestBulkRequestSent,
lowestPendingBulkRequest.getAsLong());
try {
@@ -359,9 +359,7 @@ public class ElasticBulkProcessorHandler {
* @throws IOException if an error happened while processing the bulk
requests
*/
public void close() throws IOException {
- if (closed.getAndSet(true)) {
- LOG.info("Already closed");
- } else {
+ if (closed.compareAndSet(false, true)) {
LOG.info("Closing bulk processor handler");
printStatistics();
// This blocks until all requests are processed
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
index 9e683b4dcb..196f40d217 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
@@ -52,10 +52,17 @@ public class ElasticIndexEditorProvider implements
IndexEditorProvider {
public ElasticIndexEditorProvider(@NotNull ElasticIndexTracker
indexTracker,
@NotNull ElasticConnection
elasticConnection,
ExtractedTextCache extractedTextCache) {
+ this(indexTracker, elasticConnection, extractedTextCache, new
ElasticBulkProcessorHandler(elasticConnection));
+ }
+
+ public ElasticIndexEditorProvider(@NotNull ElasticIndexTracker
indexTracker,
+ @NotNull ElasticConnection
elasticConnection,
+ ExtractedTextCache extractedTextCache,
+ ElasticBulkProcessorHandler
bulkProcessorHandler) {
this.indexTracker = indexTracker;
this.elasticConnection = elasticConnection;
this.extractedTextCache = extractedTextCache != null ?
extractedTextCache : new ExtractedTextCache(0, 0);
- this.bulkProcessorHandler = new
ElasticBulkProcessorHandler(elasticConnection);
+ this.bulkProcessorHandler = bulkProcessorHandler;
}
@Override