[ https://issues.apache.org/jira/browse/ARTEMIS-3850?focusedWorklogId=779139&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779139 ]
ASF GitHub Bot logged work on ARTEMIS-3850: ------------------------------------------- Author: ASF GitHub Bot Created on: 07/Jun/22 14:52 Start Date: 07/Jun/22 14:52 Worklog Time Spent: 10m Work Description: gemmellr commented on code in PR #4101: URL: https://github.com/apache/activemq-artemis/pull/4101#discussion_r891298940 ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java: ########## @@ -543,77 +283,110 @@ public boolean delete(final PagedMessage[] messages) throws Exception { logger.debugf("Deleting pageNr=%d on store %s", pageId, storeName); } - final List<Long> largeMessageIds; - if (messages != null && messages.length > 0) { - largeMessageIds = new ArrayList<>(); - for (PagedMessage msg : messages) { - if ((msg.getMessage()).isLargeMessage()) { - // this will trigger large message delete: no need to do it - // for non-large messages! - msg.getMessage().usageDown(); - largeMessageIds.add(msg.getMessage().getMessageID()); + if (messages != null) { + try (LinkedListIterator<PagedMessage> iter = messages.iterator()) { + while (iter.hasNext()) { + PagedMessage msg = iter.next(); + if ((msg.getMessage()).isLargeMessage()) { + ((LargeServerMessage)(msg.getMessage())).deleteFile(); + msg.getMessage().usageDown(); + } } } - } else { - largeMessageIds = Collections.emptyList(); } - try { - if (!storageManager.waitOnOperations(5000)) { - ActiveMQServerLogger.LOGGER.timedOutWaitingForLargeMessagesDeletion(largeMessageIds); + storageManager.afterCompleteOperations(new IOCallback() { + @Override + public void done() { + try { + if (suspiciousRecords) { + ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(), file.getFileName()); + file.renameTo(file.getFileName() + ".invalidPage"); + } else { + file.delete(); + } + referenceCounter.reset(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.pageDeleteError(e); + } } - if (suspiciousRecords) { - ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(), file.getFileName()); - file.renameTo(file.getFileName() + ".invalidPage"); - } else { - file.delete(); + + @Override + public void onError(int errorCode, String errorMessage) { + } + }); - return true; - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.pageDeleteError(e); - return false; + return true; + } + + public int readNumberOfMessages() throws Exception { + boolean wasOpen = isOpen(); + + if (!wasOpen) { + if (!open(false)) { + return 0; + } + } + + try { + int numberOfMessages = PageReadWriter.readFromSequentialFile(this.storageManager, + this.storeName, + this.fileFactory, + this.file, + this.pageId, + null, + PageReadWriter.SKIP_ALL, + null, + null); + if (logger.isDebugEnabled()) { + logger.debug(">>> Reading numberOfMessages page " + this.pageId + ", returning " + numberOfMessages); + } + return numberOfMessages; + } finally { + if (!wasOpen) { + close(false); + } } } public int getNumberOfMessages() { - return numberOfMessages.intValue(); + return numberOfMessages; } public int getSize() { - return size.intValue(); + return size; } - @Override - public String toString() { - return "Page::pageNr=" + this.pageId + ", file=" + this.file; + private void setSize(int size) { + this.size = size; } @Override - public int compareTo(Page otherPage) { - return otherPage.getPageId() - this.pageId; + public String toString() { + return "Page::seqCreation=" + seqInt + ", pageNr=" + this.pageId + ", file=" + this.file; } @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + pageId; - return result; + public int compareTo(Page o) { + return 0; Review Comment: This seems odd. If its no longer comparable, perhaps remove the interface? ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java: ########## @@ -17,57 +17,80 @@ package org.apache.activemq.artemis.core.paging.impl; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.artemis.utils.Env; +import org.apache.activemq.artemis.utils.ReferenceCounterUtil; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.EmptyList; +import org.apache.activemq.artemis.utils.collections.LinkedList; +import org.apache.activemq.artemis.utils.collections.LinkedListImpl; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.jboss.logging.Logger; public final class Page implements Comparable<Page> { + private static final AtomicInteger factory = new AtomicInteger(0); + + private final int seqInt = factory.incrementAndGet(); + private static final Logger logger = Logger.getLogger(Page.class); - public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BYTE; + private final ReferenceCounterUtil referenceCounter = new ReferenceCounterUtil(); - private static final byte START_BYTE = (byte) '{'; + public void usageReset() { + referenceCounter.reset(); + } - private static final byte END_BYTE = (byte) '}'; + public int usageUp() { + return referenceCounter.increment(); + } + + public int usageDown() { + return referenceCounter.decrement(); + } - private final int pageId; + /** This is an utility method to help you call usageDown while using a try (closeable) call. + * */ + public AutoCloseable refCloseable() { + return referenceCounter; + } + + /** to be called when the page is supposed to be released */ + public void releaseTask(Consumer<Page> releaseTask) { + referenceCounter.setTask(() -> releaseTask.accept(this)); + } + + private final long pageId; private boolean suspiciousRecords = false; - private final AtomicInteger numberOfMessages = new AtomicInteger(0); + private int numberOfMessages; private final SequentialFile file; private final SequentialFileFactory fileFactory; + private volatile LinkedList<PagedMessage> messages; + /** * The page cache that will be filled with data as we write more data */ - private volatile LivePageCache pageCache; + private volatile Consumer<PagedMessage> callback; Review Comment: Comment above is inaccurate now ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java: ########## @@ -628,6 +636,24 @@ public AddressSettings setMaxSizeBytes(final long maxSizeBytes) { return this; } + public int getMaxReadPageMessages() { + return maxReadPageMessages != null ? maxReadPageMessages : AddressSettings.DEFAULT_MAX_READ_PAGE_MESSAGES; + } + + public AddressSettings setMaxReadPageMessages(final int maxReadPageMessages) { + this.maxReadPageMessages = maxReadPageMessages; + return this; + } + + public int getMaxReadPageBytes() { + return maxReadPageBytes != null ? maxReadPageBytes : 2 * getPageSizeBytes(); Review Comment: Does the doc say this defaults to double the page size? Dont recall seeing it. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java: ########## @@ -0,0 +1,70 @@ +/** Review Comment: Should be a comment ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java: ########## @@ -207,8 +208,8 @@ private void checkIDSupplier(NodeStore<MessageReference> nodeStore) { // The quantity of pagedReferences on messageReferences priority list private final AtomicInteger pagedReferences = new AtomicInteger(0); - // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage - final AtomicInteger queueMemorySize = new AtomicInteger(0); + + final SizeAwareMetric queueMemorySize = new SizeAwareMetric(); Review Comment: Ah so it did use a counter before. ########## docs/user-manual/en/paging.md: ########## @@ -96,12 +96,20 @@ Property Name|Description|Default `max-size-messages`|The max number of messages the address could have before entering on page mode.| -1 (disabled) `page-size-bytes`|The size of each page file used on the paging system|10MB `address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE` -`page-max-cache-size`|The system will keep up to `page-max-cache-size` page files in memory to optimize IO during paging navigation.|5 +`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. +`max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. Review Comment: Defaults would be good. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java: ########## @@ -49,7 +49,7 @@ public int expectedEncodeSize() { @Override public void encodeRest(final ActiveMQBuffer buffer) { - buffer.writeInt(pageNumber); + buffer.writeInt((int)pageNumber); Review Comment: Ditto ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java: ########## @@ -17,57 +17,80 @@ package org.apache.activemq.artemis.core.paging.impl; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.artemis.utils.Env; +import org.apache.activemq.artemis.utils.ReferenceCounterUtil; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.EmptyList; +import org.apache.activemq.artemis.utils.collections.LinkedList; +import org.apache.activemq.artemis.utils.collections.LinkedListImpl; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.jboss.logging.Logger; public final class Page implements Comparable<Page> { + private static final AtomicInteger factory = new AtomicInteger(0); + + private final int seqInt = factory.incrementAndGet(); + private static final Logger logger = Logger.getLogger(Page.class); - public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BYTE; + private final ReferenceCounterUtil referenceCounter = new ReferenceCounterUtil(); - private static final byte START_BYTE = (byte) '{'; + public void usageReset() { + referenceCounter.reset(); + } - private static final byte END_BYTE = (byte) '}'; + public int usageUp() { + return referenceCounter.increment(); + } + + public int usageDown() { + return referenceCounter.decrement(); + } - private final int pageId; + /** This is an utility method to help you call usageDown while using a try (closeable) call. + * */ Review Comment: Seems like it could be one line ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java: ########## @@ -17,57 +17,80 @@ package org.apache.activemq.artemis.core.paging.impl; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; -import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.artemis.utils.Env; +import org.apache.activemq.artemis.utils.ReferenceCounterUtil; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.collections.EmptyList; +import org.apache.activemq.artemis.utils.collections.LinkedList; +import org.apache.activemq.artemis.utils.collections.LinkedListImpl; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.jboss.logging.Logger; public final class Page implements Comparable<Page> { + private static final AtomicInteger factory = new AtomicInteger(0); + + private final int seqInt = factory.incrementAndGet(); + Review Comment: its nice to have the logger at the top and have statics at the top, these adds make it static, non-static, static...if you put them below the logger it would be nicer. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageReadWriter.java: ########## @@ -0,0 +1,281 @@ +/** Review Comment: Ditto ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java: ########## @@ -463,43 +215,40 @@ public void writeDirect(PagedMessage message) throws Exception { if (!file.isOpen()) { throw ActiveMQMessageBundle.BUNDLE.cannotWriteToClosedFile(file); } - final int messageEncodedSize = message.getEncodeSize(); - final int bufferSize = messageEncodedSize + Page.SIZE_RECORD; - final ByteBuffer buffer = fileFactory.newBuffer(bufferSize); - ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer)); - activeMQBuffer.clear(); - activeMQBuffer.writeByte(Page.START_BYTE); - activeMQBuffer.writeInt(messageEncodedSize); - message.encode(activeMQBuffer); - activeMQBuffer.writeByte(Page.END_BYTE); - assert (activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected"; - //buffer limit and position are the same - assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed"; - file.writeDirect(buffer, false); - if (pageCache != null) { - pageCache.addLiveMessage(message); + if (callback != null) { + callback.accept(message); } - //lighter than addAndGet when single writer - numberOfMessages.lazySet(numberOfMessages.get() + 1); - size.lazySet(size.get() + bufferSize); + addMessage(message); + this.size += PageReadWriter.writeMessage(message, fileFactory, file); + numberOfMessages++; } public void sync() throws Exception { file.sync(); } - public void open(boolean createFile) throws Exception { + public boolean isOpen() { + return file != null && file.isOpen(); + } + + + public boolean open(boolean createFile) throws Exception { + boolean isOpen = false; if (!file.isOpen() && (createFile || file.exists())) { file.open(); + isOpen = true; } if (file.isOpen()) { - size.set((int) file.size()); + isOpen = true; + size = (int) file.size(); Review Comment: Should probably validate the value before truncating it? ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java: ########## @@ -1444,20 +1276,39 @@ public List<MessageReference> getRelatedMessageReferences() { private class CursorIterator implements PageIterator { - private PagePositionAndFileOffset position = null; + // The cursorLogger is declared as static on the class, just to avoid a getLogger() call every time + private final Logger logger = cursorLogger; Review Comment: Why assign the existing static variable to an instance variable, rather than just using or creating the static logger here? ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java: ########## @@ -54,7 +54,7 @@ public int expectedEncodeSize() { @Override public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeSimpleString(storeName); - buffer.writeInt(pageNumber); + buffer.writeInt((int)pageNumber); Review Comment: seems like it should at least validate the number before truncating it...and then perhaps explode if it would have done the wrong thing? ########## docs/user-manual/en/paging.md: ########## @@ -96,12 +96,20 @@ Property Name|Description|Default `max-size-messages`|The max number of messages the address could have before entering on page mode.| -1 (disabled) `page-size-bytes`|The size of each page file used on the paging system|10MB `address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE` -`page-max-cache-size`|The system will keep up to `page-max-cache-size` page files in memory to optimize IO during paging navigation.|5 +`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. +`max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. +`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and `max-read-page-bytes` will replace this functionality. Review Comment: Not will replace, but have replaced. Elsewhere I tend to just remove options from the docs that dont do anything. Is a message logged if the old one is used? ########## docs/user-manual/en/paging.md: ########## @@ -96,12 +96,20 @@ Property Name|Description|Default `max-size-messages`|The max number of messages the address could have before entering on page mode.| -1 (disabled) `page-size-bytes`|The size of each page file used on the paging system|10MB `address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE` -`page-max-cache-size`|The system will keep up to `page-max-cache-size` page files in memory to optimize IO during paging navigation.|5 +`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. +`max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. +`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and `max-read-page-bytes` will replace this functionality. ### max-size-bytes and max-size-messages simultaneous usage It is possible to define max-size-messages (as the maximum number of messages) and max-messages-size (as the max number of estimated memory used by the address) concurrently. The configured policy will start based on the first value to reach its mark. +#### Maximum read from page + +Similarly to `max-size-bytes` and `max-size-messages`, the same can happen with `max-read-page-bytes` and `max-read-page-messages` when reading messages from paging. Review Comment: The "Similarly to `max-size-bytes` and `max-size-messages` the same can happen" bit seems superfluous. It would be cleare to just describe the behaviour of these options rather than comparing to ultimately unrelated options. ########## artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderAccessor.java: ########## @@ -5,20 +5,23 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> Review Comment: This is wrong, its a comment not javadoc, should be left as is. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageReadWriter.java: ########## @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.paging.impl; + +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.paging.PagedMessage; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.Env; +import org.jboss.logging.Logger; + +public class PageReadWriter { + + + private static Logger logger = Logger.getLogger(PageReadWriter.class); + + public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_BYTE; + + private static final byte START_BYTE = (byte) '{'; + + private static final byte END_BYTE = (byte) '}'; + + //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE) + private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2; + private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE; + private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1; + private static final int MIN_CHUNK_SIZE = Env.osPageSize(); + + public interface SuspectFileCallback { + void onSuspect(String fileName, int position, int msgNumber); + } + + public interface PageRecordFilter { + boolean skip(ActiveMQBuffer buffer); + } + + public interface ReadCallback { + void readComple(int size); + } + + public static final PageRecordFilter ONLY_LARGE = (buffer) -> !PagedMessageImpl.isLargeMessage(buffer); + + public static final PageRecordFilter NO_SKIP = (buffer) -> false; + + public static final PageRecordFilter SKIP_ALL = (buffer) -> true; + + public static int writeMessage(PagedMessage message, SequentialFileFactory fileFactory, SequentialFile file) throws Exception { + final int messageEncodedSize = message.getEncodeSize(); + final int bufferSize = messageEncodedSize + SIZE_RECORD; + final ByteBuffer buffer = fileFactory.newBuffer(bufferSize); + ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer)); + activeMQBuffer.clear(); + activeMQBuffer.writeByte(START_BYTE); + activeMQBuffer.writeInt(messageEncodedSize); + message.encode(activeMQBuffer); + activeMQBuffer.writeByte(END_BYTE); + assert (activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected"; + //buffer limit and position are the same + assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed"; + file.writeDirect(buffer, false); + return bufferSize; + } + + + Review Comment: Its weird that there are all these newlines between methods, but a little below this, there is a 100 line method that is screaming for some newline grouping/seperation but has none. ########## docs/user-manual/en/paging.md: ########## @@ -96,12 +96,20 @@ Property Name|Description|Default `max-size-messages`|The max number of messages the address could have before entering on page mode.| -1 (disabled) `page-size-bytes`|The size of each page file used on the paging system|10MB `address-full-policy`|This must be set to `PAGE` for paging to enable. If the value is `PAGE` then further messages will be paged to disk. If the value is `DROP` then further messages will be silently dropped. If the value is `FAIL` then the messages will be dropped and the client message producers will receive an exception. If the value is `BLOCK` then client message producers will block when they try and send further messages.|`PAGE` -`page-max-cache-size`|The system will keep up to `page-max-cache-size` page files in memory to optimize IO during paging navigation.|5 +`max-read-page-messages` | how many message can be read from paging into the Queue whenever more messages are needed. The system wtill stop reading if `max-read-page-bytes hits the limit first. +`max-read-page-bytes` | how much memory the messages read from paging can take on the Queue whenever more messages are needed. The system will stop reading if `max-read-page-messages` hits the limit first. +`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and `max-read-page-bytes` will replace this functionality. ### max-size-bytes and max-size-messages simultaneous usage It is possible to define max-size-messages (as the maximum number of messages) and max-messages-size (as the max number of estimated memory used by the address) concurrently. The configured policy will start based on the first value to reach its mark. +#### Maximum read from page + +Similarly to `max-size-bytes` and `max-size-messages`, the same can happen with `max-read-page-bytes` and `max-read-page-messages` when reading messages from paging. + +<b>Warning</b>: When messages are read from paging into memory, when they are redelivered (for either a rollback or a closed consumers). Messages will stay in memory until they are delivered again. In these cases of redeliveries it would be expected to have more messages in memory than the configured maximum values. Review Comment: The first sentence ends prematurely, the second one is really part of the same sentence. ########## docs/user-manual/en/paging.md: ########## @@ -192,6 +200,12 @@ The pages are synced periodically and the sync period is configured through the same value of `journal-buffer-timeout`. When using ASYNCIO, the default should be `3333333`. +## Memory usage from Paged Messages. + +The system should keep at least one paged file in memory caching ahead reading messages. +Also every active subscription could keep one paged file in memory. So if your system has too many queues (some people would call this an horizontal topology). +So, if your system has too many queues it is recommended to minimize the page-size. Review Comment: third sentence ends prematurely, its really continued by the fourth. Issue Time Tracking ------------------- Worklog Id: (was: 779139) Time Spent: 1.5h (was: 1h 20m) > Add Option to read messages into paging based on sizing and eliminate caching > ----------------------------------------------------------------------------- > > Key: ARTEMIS-3850 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3850 > Project: ActiveMQ Artemis > Issue Type: New Feature > Affects Versions: 2.22.0 > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.24.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.7#820007)