This is an automated email from the ASF dual-hosted git repository. baedke pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push: new 8f0bedf8a4 OAK-10356 - Adjust lower and upper bounds of auto-detected memory limits in PipelinedStrategy (#1032) 8f0bedf8a4 is described below commit 8f0bedf8a42bb7d7119495d8afe5f9422693ed6c Author: Nuno Santos <nsan...@adobe.com> AuthorDate: Wed Jul 19 09:55:55 2023 +0200 OAK-10356 - Adjust lower and upper bounds of auto-detected memory limits in PipelinedStrategy (#1032) * Reduce the minimum bound and set a maximum bound for the auto-detected working memory in the Pipelined strategy. * Reduce default number of transform threads. --- .../flatfile/pipelined/NodeStateEntryBatch.java | 6 ++++-- .../flatfile/pipelined/PipelinedStrategy.java | 20 +++++++++++++------- .../flatfile/pipelined/NodeStateEntryBatchTest.java | 19 +++++++++---------- .../pipelined/PipelinedSortBatchTaskTest.java | 8 ++++---- 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java index 6e4f508d84..5fb2ef0163 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatch.java @@ -22,9 +22,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; public class NodeStateEntryBatch { + // Must be large enough to hold a full node state entry + static final int MIN_BUFFER_SIZE = 256 * 1024; public static NodeStateEntryBatch createNodeStateEntryBatch(int bufferSize, int maxNumEntries) { - if (bufferSize < 128) { - throw new IllegalArgumentException("Buffer size must be at least 128 bytes"); + if (bufferSize < MIN_BUFFER_SIZE) { + throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE + " bytes"); } if (maxNumEntries < 1) { throw new IllegalArgumentException("Max number of entries must be at least 1"); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java index 56c8f93b9e..bdd94d1f9e 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java @@ -18,12 +18,12 @@ */ package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.mongodb.BasicDBObject; import com.mongodb.client.MongoCollection; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.guava.common.base.Preconditions; import org.apache.jackrabbit.guava.common.base.Stopwatch; +import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder; import org.apache.jackrabbit.oak.commons.Compression; import org.apache.jackrabbit.oak.index.indexer.document.flatfile.NodeStateEntryWriter; import org.apache.jackrabbit.oak.index.indexer.document.flatfile.SortStrategy; @@ -116,7 +116,7 @@ public class PipelinedStrategy implements SortStrategy { public static final String OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE = "oak.indexer.pipelined.mongoDocBatchSize"; public static final int DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE = 500; public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = "oak.indexer.pipelined.transformThreads"; - public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 3; + public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 2; public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = "oak.indexer.pipelined.workingMemoryMB"; // 0 means autodetect public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 0; @@ -127,8 +127,10 @@ public class PipelinedStrategy implements SortStrategy { static final Charset FLATFILESTORE_CHARSET = StandardCharsets.UTF_8; private static final Logger LOG = LoggerFactory.getLogger(PipelinedStrategy.class); - private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 64; - private static final int MIN_WORKING_MEMORY_MB = 512; + // A MongoDB document is at most 16MB, so the buffer that holds node state entries must be at least that big + private static final int MIN_ENTRY_BATCH_BUFFER_SIZE_MB = 16; + private static final int MIN_AUTODETECT_WORKING_MEMORY_MB = 128; + private static final int MAX_AUTODETECT_WORKING_MEMORY_MB = 4000; private class MonitorTask implements Runnable { private final ArrayBlockingQueue<BasicDBObject[]> mongoDocQueue; @@ -219,9 +221,13 @@ public class PipelinedStrategy implements SortStrategy { int maxHeapSizeMB = (int) (Runtime.getRuntime().maxMemory() / FileUtils.ONE_MB); int workingMemoryMB = maxHeapSizeMB - 2048; LOG.info("Auto detecting working memory. Maximum heap size: {} MB, selected working memory: {} MB", maxHeapSizeMB, workingMemoryMB); - if (workingMemoryMB < MIN_WORKING_MEMORY_MB) { - LOG.warn("Working memory too low. Setting to minimum: {} MB", MIN_WORKING_MEMORY_MB); - workingMemoryMB = MIN_WORKING_MEMORY_MB; + if (workingMemoryMB > MAX_AUTODETECT_WORKING_MEMORY_MB) { + LOG.warn("Auto-detected value for working memory too high, setting to the maximum allowed for auto-detection: {} MB", MAX_AUTODETECT_WORKING_MEMORY_MB); + return MAX_AUTODETECT_WORKING_MEMORY_MB; + } + if (workingMemoryMB < MIN_AUTODETECT_WORKING_MEMORY_MB) { + LOG.warn("Auto-detected value for working memory too low, setting to the minimum allowed for auto-detection: {} MB", MIN_AUTODETECT_WORKING_MEMORY_MB); + return MIN_AUTODETECT_WORKING_MEMORY_MB; } return workingMemoryMB; } diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java index beb2fc0ac9..32cae0a0e7 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/NodeStateEntryBatchTest.java @@ -33,7 +33,7 @@ public class NodeStateEntryBatchTest { @Test public void testMaximumNumberOfEntries() { - NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(1024, 2); + NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 2); assertFalse(batch.isAtMaxEntries()); batch.addEntry("a", new byte[1]); assertFalse(batch.isAtMaxEntries()); @@ -45,12 +45,12 @@ public class NodeStateEntryBatchTest { @Test public void testMaximumBufferSize() { - NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(128, 10); - assertTrue(batch.hasSpaceForEntry(new byte[124])); // Needs 4 bytes for the length - assertFalse(batch.hasSpaceForEntry(new byte[125])); + NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 10); + assertTrue(batch.hasSpaceForEntry(new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4])); // Needs 4 bytes for the length + assertFalse(batch.hasSpaceForEntry(new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE])); - batch.addEntry("a", new byte[124]); - assertEquals(124 + 4, batch.sizeOfEntries()); + batch.addEntry("a", new byte[NodeStateEntryBatch.MIN_BUFFER_SIZE -4]); + assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, batch.sizeOfEntries()); assertEquals(1, batch.numberOfEntries()); assertFalse(batch.hasSpaceForEntry(new byte[1])); assertThrows(BufferOverflowException.class, () -> batch.addEntry("b", new byte[1])); @@ -58,9 +58,8 @@ public class NodeStateEntryBatchTest { @Test public void flipAndResetBuffer() { - int sizeOfEntry = 124; - int bufferSize = 1024; - NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(bufferSize, 10); + int sizeOfEntry = NodeStateEntryBatch.MIN_BUFFER_SIZE-4; + NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 10); byte[] testArray = new byte[sizeOfEntry]; for (int i = 0; i < sizeOfEntry; i++) { testArray[i] = (byte) (i % 127); @@ -83,6 +82,6 @@ public class NodeStateEntryBatchTest { assertEquals(0, batch.numberOfEntries()); assertEquals(0, batch.getBuffer().position()); - assertEquals(bufferSize, batch.getBuffer().remaining()); + assertEquals(NodeStateEntryBatch.MIN_BUFFER_SIZE, batch.getBuffer().remaining()); } } \ No newline at end of file diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java index 381d067838..7665929e28 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedSortBatchTaskTest.java @@ -79,7 +79,7 @@ public class PipelinedSortBatchTaskTest { @Test public void emptyBatch() throws Exception { - NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10); + NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 10); TestResult testResult = runTest(batch); @@ -91,7 +91,7 @@ public class PipelinedSortBatchTaskTest { @Test public void oneBatch() throws Exception { - NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10); + NodeStateEntryBatch batch = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 10); addEntry(batch, "/a0/b0", "{\"key\":2}"); addEntry(batch, "/a0", "{\"key\":1}"); addEntry(batch, "/a1/b0", "{\"key\":6}"); @@ -120,12 +120,12 @@ public class PipelinedSortBatchTaskTest { @Test public void twoBatches() throws Exception { - NodeStateEntryBatch batch1 = NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10); + NodeStateEntryBatch batch1 = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 10); addEntry(batch1, "/a0/b0", "{\"key\":2}"); addEntry(batch1, "/a0", "{\"key\":1}"); addEntry(batch1, "/a1/b0", "{\"key\":6}"); - NodeStateEntryBatch batch2 = NodeStateEntryBatch.createNodeStateEntryBatch(1024, 10); + NodeStateEntryBatch batch2 = NodeStateEntryBatch.createNodeStateEntryBatch(NodeStateEntryBatch.MIN_BUFFER_SIZE, 10); addEntry(batch2, "/a0/b1", "{\"key\":5}"); addEntry(batch2, "/a0/b0/c1", "{\"key\":4}"); addEntry(batch2, "/a0/b0/c0", "{\"key\":3}");