This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0601c69cbf OAK-11655 - Do not create redundant instances of 
ElasticBulkProcessorHandler (#2225)
0601c69cbf is described below

commit 0601c69cbf2f7a97b1571fd3fd69bc463af1437e
Author: Nuno Santos <[email protected]>
AuthorDate: Wed Apr 16 11:32:35 2025 +0200

    OAK-11655 - Do not create redundant instances of 
ElasticBulkProcessorHandler (#2225)
---
 .../indexer/document/DocumentStoreIndexerBase.java | 32 ++++++++++------------
 .../oak/index/ElasticDocumentStoreIndexer.java     | 24 ++++++++--------
 .../indexer/document/ElasticIndexerProvider.java   | 10 +++----
 .../indexer/document/DocumentStoreIndexer.java     | 13 ++++-----
 .../elastic/index/ElasticBulkProcessorHandler.java | 16 +++++------
 .../elastic/index/ElasticIndexEditorProvider.java  |  9 +++++-
 6 files changed, 50 insertions(+), 54 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
index 27e52cf327..b17c84640d 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java
@@ -29,7 +29,6 @@ import org.apache.jackrabbit.oak.commons.time.Stopwatch;
 import org.apache.jackrabbit.oak.index.IndexHelper;
 import org.apache.jackrabbit.oak.index.IndexerSupport;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder;
-import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
 import 
org.apache.jackrabbit.oak.index.indexer.document.incrementalstore.IncrementalStoreBuilder;
 import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStore;
 import 
org.apache.jackrabbit.oak.index.indexer.document.tree.ParallelIndexStore;
@@ -39,13 +38,14 @@ import 
org.apache.jackrabbit.oak.plugins.document.RevisionVector;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.mongo.TraversingRange;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
 import org.apache.jackrabbit.oak.plugins.index.FormattingUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
 import org.apache.jackrabbit.oak.plugins.index.MetricsFormatter;
 import org.apache.jackrabbit.oak.plugins.index.MetricsUtils;
 import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
-import org.apache.jackrabbit.oak.plugins.index.IndexingReporter;
 import 
org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
 import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
 import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
@@ -104,7 +104,7 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
     protected final IndexHelper indexHelper;
     private final IndexingReporter indexingReporter;
     private final StatisticsProvider statisticsProvider;
-    protected List<NodeStateIndexerProvider> indexerProviders;
+    protected NodeStateIndexerProvider indexerProvider;
     protected final IndexerSupport indexerSupport;
     private static final int MAX_DOWNLOAD_ATTEMPTS = 
Integer.parseInt(System.getProperty("oak.indexer.maxDownloadRetries", "5")) + 1;
 
@@ -118,8 +118,8 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
         this.statisticsProvider = indexHelper.getStatisticsProvider();
     }
 
-    protected void setProviders() throws IOException {
-        this.indexerProviders = createProviders();
+    protected void setProvider() throws IOException {
+        this.indexerProvider = createProvider();
     }
 
     private static class MongoNodeStateEntryTraverserFactory implements 
NodeStateEntryTraverserFactory {
@@ -410,12 +410,10 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
                         log.info("Top slowest nodes to index (ms): {}", 
slowestTopKElements);
                     }
                 }
-                for (NodeStateIndexerProvider indexerProvider : 
indexerProviders) {
-                    ExtractedTextCache extractedTextCache = 
indexerProvider.getTextCache();
-                    CacheStats cacheStats = extractedTextCache == null ? null 
: extractedTextCache.getCacheStats();
-                    log.info("Text extraction cache statistics: {}", 
cacheStats == null ? "N/A" : cacheStats.cacheInfoAsString());
-                    indexerProvider.close();
-                }
+                ExtractedTextCache extractedTextCache = 
indexerProvider.getTextCache();
+                CacheStats cacheStats = extractedTextCache == null ? null : 
extractedTextCache.getCacheStats();
+                log.info("Text extraction cache statistics: {}", cacheStats == 
null ? "N/A" : cacheStats.cacheInfoAsString());
+                indexerProvider.close();
 
                 progressReporter.reindexingTraversalEnd();
                 progressReporter.logReport();
@@ -538,19 +536,17 @@ public abstract class DocumentStoreIndexerBase implements 
Closeable {
 
             idxBuilder.setProperty(IndexConstants.REINDEX_PROPERTY_NAME, 
false);
 
-            for (NodeStateIndexerProvider indexerProvider : indexerProviders) {
-                NodeStateIndexer indexer = indexerProvider.getIndexer(type, 
indexPath, idxBuilder, root, progressReporter);
-                if (indexer != null) {
-                    indexers.add(indexer);
-                    progressReporter.registerIndex(indexPath, true, -1);
-                }
+            NodeStateIndexer indexer = indexerProvider.getIndexer(type, 
indexPath, idxBuilder, root, progressReporter);
+            if (indexer != null) {
+                indexers.add(indexer);
+                progressReporter.registerIndex(indexPath, true, -1);
             }
         }
 
         return new CompositeIndexer(indexers);
     }
 
-    protected abstract List<NodeStateIndexerProvider> createProviders() throws 
IOException;
+    protected abstract NodeStateIndexerProvider createProvider() throws 
IOException;
 
     protected abstract void preIndexOperations(List<NodeStateIndexer> 
indexers);
 
diff --git 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
index 31cab0b44d..39c0ca2fa8 100644
--- 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
+++ 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticDocumentStoreIndexer.java
@@ -52,17 +52,15 @@ public class ElasticDocumentStoreIndexer extends 
DocumentStoreIndexerBase {
         this.port = port;
         this.apiKeyId = apiKeyId;
         this.apiSecretId = apiSecretId;
-        setProviders();
+        setProvider();
     }
 
-    protected List<NodeStateIndexerProvider> createProviders() {
-        List<NodeStateIndexerProvider> providers = List.of(
-                createElasticIndexerProvider()
-        );
-
-        providers.forEach(closer::register);
-        return providers;
+    protected NodeStateIndexerProvider createProvider() {
+        NodeStateIndexerProvider provider = createElasticIndexerProvider();
+        closer.register(provider);
+        return provider;
     }
+
     /*
     Used to provision elastic index before starting indexing
     Otherwise proper alias naming and mapping will not be applied
@@ -86,14 +84,14 @@ public class ElasticDocumentStoreIndexer extends 
DocumentStoreIndexerBase {
                         host,
                         port
                 );
-        final ElasticConnection coordinate;
+        final ElasticConnection connection;
         if (apiKeyId != null && apiSecretId != null) {
-            coordinate = buildStep.withApiKeys(apiKeyId, apiSecretId).build();
+            connection = buildStep.withApiKeys(apiKeyId, apiSecretId).build();
         } else {
-            coordinate = buildStep.build();
+            connection = buildStep.build();
         }
-        closer.register(coordinate);
-        return new ElasticIndexerProvider(indexHelper, coordinate);
+        closer.register(connection);
+        return new ElasticIndexerProvider(indexHelper, connection);
     }
 
 }
diff --git 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
index 7a1e49a526..8a4f3c4659 100644
--- 
a/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
+++ 
b/oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
@@ -53,13 +53,15 @@ public class ElasticIndexerProvider implements 
NodeStateIndexerProvider {
     private final ElasticConnection connection;
     private final ElasticBulkProcessorHandler bulkProcessorHandler;
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ElasticIndexEditorProvider elasticIndexEditorProvider;
 
     public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection 
connection) {
         this.indexHelper = indexHelper;
         this.connection = connection;
         this.bulkProcessorHandler = new 
ElasticBulkProcessorHandler(connection);
-        this.indexWriterFactory = new ElasticIndexWriterFactory(connection,
-                new ElasticIndexTracker(connection, new 
ElasticMetricHandler(StatisticsProvider.NOOP)), bulkProcessorHandler);
+        ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection, 
new ElasticMetricHandler(StatisticsProvider.NOOP));
+        this.indexWriterFactory = new ElasticIndexWriterFactory(connection, 
indexTracker, bulkProcessorHandler);
+        this.elasticIndexEditorProvider = new 
ElasticIndexEditorProvider(indexTracker, connection, null, 
bulkProcessorHandler);
 
     }
 
@@ -73,9 +75,6 @@ public class ElasticIndexerProvider implements 
NodeStateIndexerProvider {
 
         FulltextIndexWriter<ElasticDocument> indexWriter = 
indexWriterFactory.newInstance(idxDefinition, definition, CommitInfo.EMPTY, 
true);
         FulltextBinaryTextExtractor textExtractor = new 
FulltextBinaryTextExtractor(textCache, idxDefinition, true);
-
-        ElasticIndexTracker indexTracker = new ElasticIndexTracker(connection, 
new ElasticMetricHandler(StatisticsProvider.NOOP));
-        ElasticIndexEditorProvider elasticIndexEditorProvider = new 
ElasticIndexEditorProvider(indexTracker, connection, null);
         return new ElasticIndexer(idxDefinition, textExtractor, definition, 
progressReporter, indexWriter, elasticIndexEditorProvider, indexHelper);
     }
 
@@ -87,6 +86,7 @@ public class ElasticIndexerProvider implements 
NodeStateIndexerProvider {
     @Override
     public void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
+            this.elasticIndexEditorProvider.close();
             this.bulkProcessorHandler.close();
         }
     }
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
index 49f836c301..607bbca49d 100644
--- 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
+++ 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexer.java
@@ -32,20 +32,17 @@ public class DocumentStoreIndexer extends 
DocumentStoreIndexerBase implements Cl
     public DocumentStoreIndexer(ExtendedIndexHelper extendedIndexHelper, 
IndexerSupport indexerSupport) throws IOException {
         super(extendedIndexHelper, indexerSupport);
         this.extendedIndexHelper = extendedIndexHelper;
-        setProviders();
+        setProvider();
     }
 
     private NodeStateIndexerProvider createLuceneIndexProvider() throws 
IOException {
         return new LuceneIndexerProvider(extendedIndexHelper, indexerSupport);
     }
 
-    protected List<NodeStateIndexerProvider> createProviders() throws 
IOException {
-        List<NodeStateIndexerProvider> providers = List.of(
-                createLuceneIndexProvider()
-        );
-
-        providers.forEach(closer::register);
-        return providers;
+    protected NodeStateIndexerProvider createProvider() throws IOException {
+        NodeStateIndexerProvider provider = createLuceneIndexProvider();
+        closer.register(provider);
+        return provider;
     }
 
     @Override
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
index e843554eb9..b6d30c9892 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java
@@ -127,7 +127,7 @@ public class ElasticBulkProcessorHandler {
     private final int failedDocCountForStatusNode = 
ConfigHelper.getSystemPropertyAsInt("oak.failedDocStatusLimit", 10000);
     private final int bulkMaxOperations = 
ConfigHelper.getSystemPropertyAsInt(BULK_ACTIONS_PROP, BULK_ACTIONS_DEFAULT);
     private final int bulkMaxSizeBytes = 
ConfigHelper.getSystemPropertyAsInt(BULK_SIZE_BYTES_PROP, 
BULK_SIZE_BYTES_DEFAULT);
-    private final int bulkFlushIntervalMs = 
ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP, 
BULK_FLUSH_INTERVAL_MS_DEFAULT);
+    private final int bulkFlushIntervalMillis = 
ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP, 
BULK_FLUSH_INTERVAL_MS_DEFAULT);
     private final int bulkMaxConcurrentRequests = 
ConfigHelper.getSystemPropertyAsInt(BULK_MAX_CONCURRENT_REQUESTS_PROP, 
BULK_MAX_CONCURRENT_REQUESTS_DEFAULT);
     private final boolean failOnError = 
ConfigHelper.getSystemPropertyAsBoolean(FAIL_ON_ERROR_PROP, 
FAIL_ON_ERROR_DEFAULT);
 
@@ -152,8 +152,8 @@ public class ElasticBulkProcessorHandler {
         this.elasticConnection = elasticConnection;
         // BulkIngester does not support retry policies. Some retries though 
are already implemented in the transport layer.
         // More details here: 
https://github.com/elastic/elasticsearch-java/issues/478
-        LOG.info("Creating bulk ingester [maxActions: {}, maxSizeBytes: {} 
flushInterval {}, concurrency {}]",
-                bulkMaxOperations, bulkMaxSizeBytes, bulkFlushIntervalMs, 
BULK_MAX_CONCURRENT_REQUESTS_PROP);
+        LOG.info("Creating bulk ingester [maxOperations: {}, maxSizeBytes: {}, 
flushIntervalMillis: {}, maxConcurrentRequests: {}]",
+                bulkMaxOperations, bulkMaxSizeBytes, bulkFlushIntervalMillis, 
bulkMaxConcurrentRequests);
         this.bulkIngester = BulkIngester.of(b -> {
             b = b.client(elasticConnection.getAsyncClient())
                     .listener(new OakBulkListener());
@@ -163,8 +163,8 @@ public class ElasticBulkProcessorHandler {
             if (bulkMaxSizeBytes > 0) {
                 b = b.maxSize(bulkMaxSizeBytes);
             }
-            if (bulkFlushIntervalMs > 0) {
-                b = b.flushInterval(bulkFlushIntervalMs, 
TimeUnit.MILLISECONDS);
+            if (bulkFlushIntervalMillis > 0) {
+                b = b.flushInterval(bulkFlushIntervalMillis, 
TimeUnit.MILLISECONDS);
             }
             if (bulkMaxConcurrentRequests > 0) {
                 b = b.maxConcurrentRequests(bulkMaxConcurrentRequests);
@@ -309,7 +309,7 @@ public class ElasticBulkProcessorHandler {
                 // we are closing. Wait until all requests lower or equal to 
this number are processed.
                 OptionalLong lowestPendingBulkRequest = 
pendingBulks.stream().mapToLong(Long::longValue).min();
                 // If there is no pending request, we return immediately
-                long remainingTimeoutNanos = 
TimeUnit.MILLISECONDS.toNanos(bulkFlushIntervalMs * 5L);
+                long remainingTimeoutNanos = 
TimeUnit.MILLISECONDS.toNanos(bulkFlushIntervalMillis * 5L);
                 while (lowestPendingBulkRequest.isPresent() && 
lowestPendingBulkRequest.getAsLong() <= highestBulkRequestSent) {
                     LOG.debug("Waiting for request {} to be processed. Lowest 
pending request: {}", highestBulkRequestSent, 
lowestPendingBulkRequest.getAsLong());
                     try {
@@ -359,9 +359,7 @@ public class ElasticBulkProcessorHandler {
      * @throws IOException if an error happened while processing the bulk 
requests
      */
     public void close() throws IOException {
-        if (closed.getAndSet(true)) {
-            LOG.info("Already closed");
-        } else {
+        if (closed.compareAndSet(false, true)) {
             LOG.info("Closing bulk processor handler");
             printStatistics();
             // This blocks until all requests are processed
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
index 9e683b4dcb..196f40d217 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticIndexEditorProvider.java
@@ -52,10 +52,17 @@ public class ElasticIndexEditorProvider implements 
IndexEditorProvider {
     public ElasticIndexEditorProvider(@NotNull ElasticIndexTracker 
indexTracker,
                                       @NotNull ElasticConnection 
elasticConnection,
                                       ExtractedTextCache extractedTextCache) {
+        this(indexTracker, elasticConnection, extractedTextCache, new 
ElasticBulkProcessorHandler(elasticConnection));
+    }
+
+    public ElasticIndexEditorProvider(@NotNull ElasticIndexTracker 
indexTracker,
+                                      @NotNull ElasticConnection 
elasticConnection,
+                                      ExtractedTextCache extractedTextCache,
+                                      ElasticBulkProcessorHandler 
bulkProcessorHandler) {
         this.indexTracker = indexTracker;
         this.elasticConnection = elasticConnection;
         this.extractedTextCache = extractedTextCache != null ? 
extractedTextCache : new ExtractedTextCache(0, 0);
-        this.bulkProcessorHandler = new 
ElasticBulkProcessorHandler(elasticConnection);
+        this.bulkProcessorHandler = bulkProcessorHandler;
     }
 
     @Override

Reply via email to