>From Wail Alkowaileet <[email protected]>: Wail Alkowaileet has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965 )
Change subject: [ASTERIXDB-3314][STO] Reduce buffer cache pressure on columnar ...................................................................... [ASTERIXDB-3314][STO] Reduce buffer cache pressure on columnar - user model changes: no - storage format changes: no - interface changes: no Details: - Make max merging components count to 4 in columnar datasets - Fix not unpinning columnar filter pages - Allocate initial 32KB buffers for columnar writers Change-Id: I809109b232bc5a5db0c47a52cb98c838ff55e27f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java M asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java 7 files changed, 126 insertions(+), 13 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java index cf2808e..38f7321 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java @@ -25,8 +25,11 @@ import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; +import org.apache.hyracks.util.StorageUtil; public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream { + private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE); + public MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) { super(multiPageOpRef); } @@ -38,6 +41,14 @@ @Override protected ByteBuffer confiscateNewBuffer() throws HyracksDataException { + if (buffers.isEmpty()) { + /* + * One buffer on the house to avoid confiscating a whole page for a tiny stream. + * This protects pressuring the buffer cache from confiscating pages for small columns. Think sparse + * columns, which may take only a few hundreds of bytes to write. + */ + return ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + } return multiPageOpRef.getValue().confiscateTemporary(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml index 8e0bc0c..93eca7c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml @@ -95,5 +95,9 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil-core</artifactId> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java index 48bd180..3e72584 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java @@ -38,8 +38,11 @@ import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements IColumnWriteMultiPageOp { + private static final Logger LOGGER = LogManager.getLogger(); private final List<CachedPage> columnsPages; private final List<CachedPage> tempConfiscatedPages; private final ColumnBTreeWriteLeafFrame columnarFrame; @@ -48,6 +51,12 @@ private boolean setLowKey; private int tupleCount; + // For logging + private int numberOfLeafNodes; + private int numberOfPagesInCurrentLeafNode; + private int maxNumberOfPagesForAColumn; + private int maxNumberOfPagesInALeafNode; + public ColumnBTreeBulkloader(float fillFactor, boolean verifyInput, IPageWriteCallback callback, ITreeIndex index, ITreeIndexFrame leafFrame) throws HyracksDataException { super(fillFactor, verifyInput, callback, index, leafFrame); @@ -59,6 +68,12 @@ lowKey = new BTreeSplitKey(tupleWriter.createTupleReference()); lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount()); setLowKey = true; + + // For logging. Starts with 1 for page0 + numberOfPagesInCurrentLeafNode = 1; + maxNumberOfPagesForAColumn = 0; + maxNumberOfPagesInALeafNode = 0; + numberOfLeafNodes = 1; } @Override @@ -118,9 +133,14 @@ for (ICachedPage page : tempConfiscatedPages) { bufferCache.returnPage(page, false); } + + // For logging + int numberOfTempConfiscatedPages = tempConfiscatedPages.size(); tempConfiscatedPages.clear(); //Where Page0 and columns pages will be written super.end(); + + log("Finished"); } @Override @@ -156,6 +176,12 @@ splitKey.setRightPage(leafFrontier.pageId); setLowKey = true; tupleCount = 0; + + // For logging + maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode); + // Starts with 1 for page0 + numberOfPagesInCurrentLeafNode = 1; + numberOfLeafNodes++; } @Override @@ -172,6 +198,12 @@ for (ICachedPage c : columnsPages) { write(c); } + + // For logging + int numberOfPagesInPersistedColumn = columnsPages.size(); + maxNumberOfPagesForAColumn = Math.max(maxNumberOfPagesForAColumn, numberOfPagesInPersistedColumn); + numberOfPagesInCurrentLeafNode += numberOfPagesInPersistedColumn; + columnsPages.clear(); } @@ -185,6 +217,9 @@ bufferCache.returnPage(page, false); } super.abort(); + + // For logging + log("Aborted"); } private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) { @@ -193,6 +228,18 @@ tupleWriter.writeTupleFields(tuple, 0, cmp.getKeyFieldCount(), splitKey.getBuffer().array(), 0); } + private void log(String status) { + if (!LOGGER.isDebugEnabled()) { + return; + } + + int numberOfTempConfiscatedPages = tempConfiscatedPages.size(); + LOGGER.debug( + "{} columnar bulkloader used leafNodes: {}, tempPagesAllocated: {}, maxPagesPerColumn: {}, and maxLeafNodePages: {}", + status, numberOfLeafNodes, numberOfTempConfiscatedPages, maxNumberOfPagesForAColumn, + maxNumberOfPagesInALeafNode); + } + /* * *********************************************************** * IColumnWriteMultiPageOp diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java index fd726cd..39952df 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java @@ -256,8 +256,8 @@ @Override public void doClose() throws HyracksDataException { - frameTuple.close(); releasePages(); + frameTuple.close(); page0 = null; pred = null; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java index e638a4a..3923025 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java @@ -32,6 +32,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; + public abstract class AbstractColumnTupleReference implements IColumnTupleIterator { private static final Logger LOGGER = LogManager.getLogger(); private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples"; @@ -41,11 +44,15 @@ private final IColumnBufferProvider[] filterBufferProviders; private final IColumnBufferProvider[] buffersProviders; private final int numberOfPrimaryKeys; - private int totalNumberOfMegaLeafNodes; - private int numOfSkippedMegaLeafNodes; private int endIndex; protected int tupleIndex; + // For logging + private final LongSet pinnedPages; + private int totalNumberOfMegaLeafNodes; + private int numOfSkippedMegaLeafNodes; + private int maxNumberOfPinnedPages; + /** * Column tuple reference * @@ -64,6 +71,7 @@ primaryKeyBufferProviders[i] = new ColumnSingleBufferProvider(i); } + pinnedPages = new LongOpenHashSet(); int numberOfFilteredColumns = info.getNumberOfFilteredColumns(); filterBufferProviders = new IColumnBufferProvider[numberOfFilteredColumns]; for (int i = 0; i < numberOfFilteredColumns; i++) { @@ -71,7 +79,7 @@ if (columnIndex < 0) { filterBufferProviders[i] = DummyColumnBufferProvider.INSTANCE; } else if (columnIndex >= numberOfPrimaryKeys) { - filterBufferProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp); + filterBufferProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages); } else { filterBufferProviders[i] = new ColumnSingleBufferProvider(columnIndex); } @@ -82,7 +90,7 @@ for (int i = 0; i < numberOfRequestedColumns; i++) { int columnIndex = info.getColumnIndex(i); if (columnIndex >= numberOfPrimaryKeys) { - buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp); + buffersProviders[i] = new ColumnMultiBufferProvider(columnIndex, multiPageOp, pinnedPages); } else { buffersProviders[i] = DummyColumnBufferProvider.INSTANCE; } @@ -116,6 +124,8 @@ int numberOfTuples = frame.getTupleCount(); //Start new page and check whether we should skip reading non-key columns or not boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples); + //Release previous pinned pages if any + unpinColumnsPages(); /* * When startIndex = 0, a call to next() is performed to get the information of the PK * and 0 skips will be performed. If startIndex (for example) is 5, a call to next() will be performed @@ -125,8 +135,6 @@ if (readColumnPages) { for (int i = 0; i < filterBufferProviders.length; i++) { IColumnBufferProvider provider = filterBufferProviders[i]; - //Release previous pinned pages if any - provider.releaseAll(); provider.reset(frame); startColumnFilter(provider, i, numberOfTuples); } @@ -135,11 +143,10 @@ if (readColumnPages && evaluateFilter()) { for (int i = 0; i < buffersProviders.length; i++) { IColumnBufferProvider provider = buffersProviders[i]; - //Release previous pinned pages if any - provider.releaseAll(); provider.reset(frame); startColumn(provider, i, numberOfTuples); } + /* * skipCount can be < 0 for cases when the tuples in the range [0, startIndex] are all anti-matters. * Consequently, tuples in the range [0, startIndex] do not have any non-key columns. Thus, the returned @@ -150,6 +157,7 @@ } else { numOfSkippedMegaLeafNodes++; } + totalNumberOfMegaLeafNodes++; } @@ -232,17 +240,30 @@ @Override public final void unpinColumnsPages() throws HyracksDataException { + for (int i = 0; i < filterBufferProviders.length; i++) { + filterBufferProviders[i].releaseAll(); + } + for (int i = 0; i < buffersProviders.length; i++) { buffersProviders[i].releaseAll(); } + + maxNumberOfPinnedPages = Math.max(maxNumberOfPinnedPages, pinnedPages.size()); + pinnedPages.clear(); } @Override public final void close() { - if (LOGGER.isInfoEnabled() && numOfSkippedMegaLeafNodes > 0) { - LOGGER.info("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes, + if (!LOGGER.isDebugEnabled()) { + return; + } + + if (numOfSkippedMegaLeafNodes > 0) { + LOGGER.debug("Filtered {} disk mega-leaf nodes out of {} in total", numOfSkippedMegaLeafNodes, totalNumberOfMegaLeafNodes); } + + LOGGER.debug("Max number of pinned pages is {}", maxNumberOfPinnedPages + 1); } /* ************************************************************* diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java index 0c17d6b..34ec856 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/ColumnMultiBufferProvider.java @@ -26,20 +26,25 @@ import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider; import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp; import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame; +import org.apache.hyracks.storage.common.buffercache.CachedPage; import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import it.unimi.dsi.fastutil.longs.LongSet; + public final class ColumnMultiBufferProvider implements IColumnBufferProvider { private final int columnIndex; private final IColumnReadMultiPageOp multiPageOp; private final Queue<ICachedPage> pages; + private final LongSet pinnedPages; private int numberOfPages; private int startPage; private int startOffset; private int length; - public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp multiPageOp) { + public ColumnMultiBufferProvider(int columnIndex, IColumnReadMultiPageOp multiPageOp, LongSet pinnedPages) { this.columnIndex = columnIndex; this.multiPageOp = multiPageOp; + this.pinnedPages = pinnedPages; pages = new ArrayDeque<>(); } @@ -107,6 +112,7 @@ private ByteBuffer readNext() throws HyracksDataException { ICachedPage columnPage = multiPageOp.pin(startPage++); pages.add(columnPage); + pinnedPages.add(((CachedPage) columnPage).getDiskPageId()); return columnPage.getBuffer(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 9fcce8b..461d416 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -583,7 +583,9 @@ @Override public void merge(ILSMIOOperation operation) throws HyracksDataException { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Started a merge operation for index: {}", lsmIndex); + MergeOperation mergeOp = (MergeOperation) operation; + LOGGER.debug("Started a merge operation (number of merging components {}) for index: {}", + mergeOp.getMergingComponents().size(), lsmIndex); } synchronized (opTracker) { enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17965 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I809109b232bc5a5db0c47a52cb98c838ff55e27f Gerrit-Change-Number: 17965 Gerrit-PatchSet: 3 Gerrit-Owner: Wail Alkowaileet <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
