This is an automated email from the ASF dual-hosted git repository.

baedke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f0bedf8a4 OAK-10356 - Adjust lower and upper bounds of auto-detected 
memory limits in PipelinedStrategy (#1032)
8f0bedf8a4 is described below

commit 8f0bedf8a42bb7d7119495d8afe5f9422693ed6c
Author: Nuno Santos <nsan...@adobe.com>
AuthorDate: Wed Jul 19 09:55:55 2023 +0200

    OAK-10356 - Adjust lower and upper bounds of auto-detected memory limits in 
PipelinedStrategy (#1032)
    
    * Reduce the minimum bound and set a maximum bound for the auto-detected 
working memory in the Pipelined strategy.
    
    * Reduce default number of transform threads.
---
 .../flatfile/pipelined/NodeStateEntryBatch.java      |  6 ++++--
 .../flatfile/pipelined/PipelinedStrategy.java        | 20 +++++++++++++-------
 .../flatfile/pipelined/NodeStateEntryBatchTest.java  | 19 +++++++++----------
 .../pipelined/PipelinedSortBatchTaskTest.java        |  8 ++++----
 4 files changed, 30 insertions(+), 23 deletions(-)

diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
index 6e4f508d84..5fb2ef0163 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java
@@ -22,9 +22,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 public class NodeStateEntryBatch {
+    // Must be large enough to hold a full node state entry
+    static final int MIN_BUFFER_SIZE = 256 * 1024;
     public static NodeStateEntryBatch createNodeStateEntryBatch(int 
bufferSize, int maxNumEntries) {
-        if (bufferSize < 128) {
-            throw new IllegalArgumentException("Buffer size must be at least 
128 bytes");
+        if (bufferSize < MIN_BUFFER_SIZE) {
+            throw new IllegalArgumentException("Buffer size must be at least " 
+ MIN_BUFFER_SIZE + " bytes");
         }
         if (maxNumEntries < 1) {
             throw new IllegalArgumentException("Max number of entries must be 
at least 1");
diff --git 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
index 56c8f93b9e..bdd94d1f9e 100644
--- 
a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
+++ 
b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java
@@ -18,12 +18,12 @@
  */
 package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.mongodb.BasicDBObject;
 import com.mongodb.client.MongoCollection;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.guava.common.base.Preconditions;
 import org.apache.jackrabbit.guava.common.base.Stopwatch;
+import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.jackrabbit.oak.commons.Compression;
 import 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter;
 import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy;
@@ -116,7 +116,7 @@ public class PipelinedStrategy implements SortStrategy {
     public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE = 
"oak.indexer.pipelined.mongoDocBatchSize";
     public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE 
= 500;
     public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 
"oak.indexer.pipelined.transformThreads";
-    public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 
3;
+    public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 
2;
     public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 
"oak.indexer.pipelined.workingMemoryMB";
     // 0 means autodetect
     public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 
0;
@@ -127,8 +127,10 @@ public class PipelinedStrategy implements SortStrategy {
     static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedStrategy.class);
-    private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 64;
-    private static final int MIN_WORKING_MEMORY_MB = 512;
+    // A MongoDB document is at most 16MB, so the buffer that holds node state 
entries must be at least that big
+    private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 16;
+    private static final int MIN_AUTODETECT_WORKING_MEMORY_MB = 128;
+    private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000;
 
     private class MonitorTask implements Runnable {
         private final ArrayBlockingQueue<BasicDBObject[]> mongoDocQueue;
@@ -219,9 +221,13 @@ public class PipelinedStrategy implements SortStrategy {
         int maxHeapSizeMB = (int) (Runtime.getRuntime().maxMemory() / 
FileUtils.ONE_MB);
         int workingMemoryMB = maxHeapSizeMB - 2048;
         LOG.info("Auto detecting working memory. Maximum heap size: {} MB, 
selected working memory: {} MB", maxHeapSizeMB, workingMemoryMB);
-        if (workingMemoryMB < MIN_WORKING_MEMORY_MB) {
-            LOG.warn("Working memory too low. Setting to minimum: {} MB", 
MIN_WORKING_MEMORY_MB);
-            workingMemoryMB = MIN_WORKING_MEMORY_MB;
+        if (workingMemoryMB > MAX_AUTODETECT_WORKING_MEMORY_MB) {
+            LOG.warn("Auto-detected value for working memory too high, setting 
to the maximum allowed for auto-detection: {} MB", 
MAX_AUTODETECT_WORKING_MEMORY_MB);
+            return MAX_AUTODETECT_WORKING_MEMORY_MB;
+        }
+        if (workingMemoryMB < MIN_AUTODETECT_WORKING_MEMORY_MB) {
+            LOG.warn("Auto-detected value for working memory too low, setting 
to the minimum allowed for auto-detection: {} MB", 
MIN_AUTODETECT_WORKING_MEMORY_MB);
+            return MIN_AUTODETECT_WORKING_MEMORY_MB;
         }
         return workingMemoryMB;
     }
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
index beb2fc0ac9..32cae0a0e7 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java
@@ -33,7 +33,7 @@ public class NodeStateEntryBatchTest {
 
     @Test
     public void testMaximumNumberOfEntries() {
-        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 2);
+        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 2);
         assertFalse(batch.isAtMaxEntries());
         batch.addEntry("a", new byte[1]);
         assertFalse(batch.isAtMaxEntries());
@@ -45,12 +45,12 @@ public class NodeStateEntryBatchTest {
 
     @Test
     public void testMaximumBufferSize() {
-        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(128, 10);
-        assertTrue(batch.hasSpaceForEntry(new byte[124])); // Needs 4 bytes 
for the length
-        assertFalse(batch.hasSpaceForEntry(new byte[125]));
+        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
+        assertTrue(batch.hasSpaceForEntry(new 
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4])); // Needs 4 bytes for the length
+        assertFalse(batch.hasSpaceForEntry(new 
byte[NodeStateEntryBatch.MIN_BUFFER_SIZE]));
 
-        batch.addEntry("a", new byte[124]);
-        assertEquals(124 + 4, batch.sizeOfEntries());
+        batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4]);
+        assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, 
batch.sizeOfEntries());
         assertEquals(1, batch.numberOfEntries());
         assertFalse(batch.hasSpaceForEntry(new byte[1]));
         assertThrows(BufferOverflowException.class, () -> batch.addEntry("b", 
new byte[1]));
@@ -58,9 +58,8 @@ public class NodeStateEntryBatchTest {
 
     @Test
     public void flipAndResetBuffer() {
-        int sizeOfEntry = 124;
-        int bufferSize = 1024;
-        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(bufferSize, 10);
+        int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4;
+        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
         byte[] testArray = new byte[sizeOfEntry];
         for (int i = 0; i < sizeOfEntry; i++) {
             testArray[i] = (byte) (i % 127);
@@ -83,6 +82,6 @@ public class NodeStateEntryBatchTest {
 
         assertEquals(0, batch.numberOfEntries());
         assertEquals(0, batch.getBuffer().position());
-        assertEquals(bufferSize, batch.getBuffer().remaining());
+        assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, 
batch.getBuffer().remaining());
     }
 }
\ No newline at end of file
diff --git 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
index 381d067838..7665929e28 100644
--- 
a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
+++ 
b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java
@@ -79,7 +79,7 @@ public class PipelinedSortBatchTaskTest {
 
     @Test
     public void emptyBatch() throws Exception {
-        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
 
         TestResult testResult = runTest(batch);
 
@@ -91,7 +91,7 @@ public class PipelinedSortBatchTaskTest {
 
     @Test
     public void oneBatch() throws Exception {
-        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+        NodeStateEntryBatch batch = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
         addEntry(batch, "/a0/b0", "{\"key\":2}");
         addEntry(batch, "/a0", "{\"key\":1}");
         addEntry(batch, "/a1/b0", "{\"key\":6}");
@@ -120,12 +120,12 @@ public class PipelinedSortBatchTaskTest {
 
     @Test
     public void twoBatches() throws Exception {
-        NodeStateEntryBatch batch1 = 
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+        NodeStateEntryBatch batch1 = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
         addEntry(batch1, "/a0/b0", "{\"key\":2}");
         addEntry(batch1, "/a0", "{\"key\":1}");
         addEntry(batch1, "/a1/b0", "{\"key\":6}");
 
-        NodeStateEntryBatch batch2 = 
NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10);
+        NodeStateEntryBatch batch2 = 
NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE,
 10);
         addEntry(batch2, "/a0/b1", "{\"key\":5}");
         addEntry(batch2, "/a0/b0/c1", "{\"key\":4}");
         addEntry(batch2, "/a0/b0/c0", "{\"key\":3}");

Reply via email to