[ 
https://issues.apache.org/jira/browse/ARTEMIS-3850?focusedWorklogId=779139&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779139
 ]

ASF GitHub Bot logged work on ARTEMIS-3850:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/22 14:52
            Start Date: 07/Jun/22 14:52
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4101:
URL: https://github.com/apache/activemq-artemis/pull/4101#discussion_r891298940


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -543,77 +283,110 @@ public boolean delete(final PagedMessage[] messages) 
throws Exception {
          logger.debugf("Deleting pageNr=%d on store %s", pageId, storeName);
       }
 
-      final List<Long> largeMessageIds;
-      if (messages != null && messages.length > 0) {
-         largeMessageIds = new ArrayList<>();
-         for (PagedMessage msg : messages) {
-            if ((msg.getMessage()).isLargeMessage()) {
-               // this will trigger large message delete: no need to do it
-               // for non-large messages!
-               msg.getMessage().usageDown();
-               largeMessageIds.add(msg.getMessage().getMessageID());
+      if (messages != null) {
+         try (LinkedListIterator<PagedMessage> iter = messages.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if ((msg.getMessage()).isLargeMessage()) {
+                  ((LargeServerMessage)(msg.getMessage())).deleteFile();
+                  msg.getMessage().usageDown();
+               }
             }
          }
-      } else {
-         largeMessageIds = Collections.emptyList();
       }
 
-      try {
-         if (!storageManager.waitOnOperations(5000)) {
-            
ActiveMQServerLogger.LOGGER.timedOutWaitingForLargeMessagesDeletion(largeMessageIds);
+      storageManager.afterCompleteOperations(new IOCallback() {
+         @Override
+         public void done() {
+            try {
+               if (suspiciousRecords) {
+                  ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(), 
file.getFileName());
+                  file.renameTo(file.getFileName() + ".invalidPage");
+               } else {
+                  file.delete();
+               }
+               referenceCounter.reset();
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.pageDeleteError(e);
+            }
          }
-         if (suspiciousRecords) {
-            ActiveMQServerLogger.LOGGER.pageInvalid(file.getFileName(), 
file.getFileName());
-            file.renameTo(file.getFileName() + ".invalidPage");
-         } else {
-            file.delete();
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
          }
+      });
 
-         return true;
-      } catch (Exception e) {
-         ActiveMQServerLogger.LOGGER.pageDeleteError(e);
-         return false;
+      return true;
+   }
+
+   public int readNumberOfMessages() throws Exception {
+      boolean wasOpen = isOpen();
+
+      if (!wasOpen) {
+         if (!open(false)) {
+            return 0;
+         }
+      }
+
+      try {
+         int numberOfMessages = 
PageReadWriter.readFromSequentialFile(this.storageManager,
+                                                                      
this.storeName,
+                                                                      
this.fileFactory,
+                                                                      
this.file,
+                                                                      
this.pageId,
+                                                                      null,
+                                                                      
PageReadWriter.SKIP_ALL,
+                                                                      null,
+                                                                      null);
+         if (logger.isDebugEnabled()) {
+            logger.debug(">>> Reading numberOfMessages page " + this.pageId + 
", returning " + numberOfMessages);
+         }
+         return numberOfMessages;
+      } finally {
+         if (!wasOpen) {
+            close(false);
+         }
       }
    }
 
    public int getNumberOfMessages() {
-      return numberOfMessages.intValue();
+      return numberOfMessages;
    }
 
    public int getSize() {
-      return size.intValue();
+      return size;
    }
 
-   @Override
-   public String toString() {
-      return "Page::pageNr=" + this.pageId + ", file=" + this.file;
+   private void setSize(int size) {
+      this.size = size;
    }
 
    @Override
-   public int compareTo(Page otherPage) {
-      return otherPage.getPageId() - this.pageId;
+   public String toString() {
+      return "Page::seqCreation=" + seqInt + ", pageNr=" + this.pageId + ", 
file=" + this.file;
    }
 
    @Override
-   public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + pageId;
-      return result;
+   public int compareTo(Page o) {
+      return 0;

Review Comment:
   This seems odd. If its no longer comparable, perhaps remove the interface?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -17,57 +17,80 @@
 package org.apache.activemq.artemis.core.paging.impl;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.EmptyList;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.jboss.logging.Logger;
 
 public final class Page implements Comparable<Page> {
 
+   private static final AtomicInteger factory = new AtomicInteger(0);
+
+   private final int seqInt = factory.incrementAndGet();
+
    private static final Logger logger = Logger.getLogger(Page.class);
 
-   public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + 
DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
+   private final ReferenceCounterUtil referenceCounter = new 
ReferenceCounterUtil();
 
-   private static final byte START_BYTE = (byte) '{';
+   public void usageReset() {
+      referenceCounter.reset();
+   }
 
-   private static final byte END_BYTE = (byte) '}';
+   public int usageUp() {
+      return referenceCounter.increment();
+   }
+
+   public int usageDown() {
+      return referenceCounter.decrement();
+   }
 
 
-   private final int pageId;
+   /** This is an utility method to help you call usageDown while using a try 
(closeable) call.
+    *  */
+   public AutoCloseable refCloseable() {
+      return referenceCounter;
+   }
+
+   /** to be called when the page is supposed to be released */
+   public void releaseTask(Consumer<Page> releaseTask) {
+      referenceCounter.setTask(() -> releaseTask.accept(this));
+   }
+
+   private final long pageId;
 
    private boolean suspiciousRecords = false;
 
-   private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+   private int numberOfMessages;
 
    private final SequentialFile file;
 
    private final SequentialFileFactory fileFactory;
 
+   private volatile LinkedList<PagedMessage> messages;
+
    /**
     * The page cache that will be filled with data as we write more data
     */
-   private volatile LivePageCache pageCache;
+   private volatile Consumer<PagedMessage> callback;

Review Comment:
   Comment above is inaccurate now



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java:
##########
@@ -628,6 +636,24 @@ public AddressSettings setMaxSizeBytes(final long 
maxSizeBytes) {
       return this;
    }
 
+   public int getMaxReadPageMessages() {
+      return maxReadPageMessages != null ? maxReadPageMessages : 
AddressSettings.DEFAULT_MAX_READ_PAGE_MESSAGES;
+   }
+
+   public AddressSettings setMaxReadPageMessages(final int 
maxReadPageMessages) {
+      this.maxReadPageMessages = maxReadPageMessages;
+      return this;
+   }
+
+   public int getMaxReadPageBytes() {
+      return maxReadPageBytes != null ? maxReadPageBytes : 2 * 
getPageSizeBytes();

Review Comment:
   Does the doc say this defaults to double the page size? Dont recall seeing 
it.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java:
##########
@@ -0,0 +1,70 @@
+/**

Review Comment:
   Should be a comment



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -207,8 +208,8 @@ private void checkIDSupplier(NodeStore<MessageReference> 
nodeStore) {
    // The quantity of pagedReferences on messageReferences priority list
    private final AtomicInteger pagedReferences = new AtomicInteger(0);
 
-   // The estimate of memory being consumed by this queue. Used to calculate 
instances of messages to depage
-   final AtomicInteger queueMemorySize = new AtomicInteger(0);
+
+   final SizeAwareMetric queueMemorySize = new SizeAwareMetric();

Review Comment:
   Ah so it did use a counter before.



##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
 `max-size-messages`|The max number of messages the address could have before 
entering on page mode.| -1 (disabled)
 `page-size-bytes`|The size of each page file used on the paging system|10MB
 `address-full-policy`|This must be set to `PAGE` for paging to enable. If the 
value is `PAGE` then further messages will be paged to disk. If the value is 
`DROP` then further messages will be silently dropped. If the value is `FAIL` 
then the messages will be dropped and the client message producers will receive 
an exception. If the value is `BLOCK` then client message producers will block 
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page 
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the 
Queue whenever more messages are needed. The system wtill stop reading if 
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take 
on the Queue whenever more messages are needed. The system will stop reading if 
`max-read-page-messages` hits the limit first.

Review Comment:
   Defaults would be good.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java:
##########
@@ -49,7 +49,7 @@ public int expectedEncodeSize() {
 
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
-      buffer.writeInt(pageNumber);
+      buffer.writeInt((int)pageNumber);

Review Comment:
   Ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -17,57 +17,80 @@
 package org.apache.activemq.artemis.core.paging.impl;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.EmptyList;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.jboss.logging.Logger;
 
 public final class Page implements Comparable<Page> {
 
+   private static final AtomicInteger factory = new AtomicInteger(0);
+
+   private final int seqInt = factory.incrementAndGet();
+
    private static final Logger logger = Logger.getLogger(Page.class);
 
-   public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + 
DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
+   private final ReferenceCounterUtil referenceCounter = new 
ReferenceCounterUtil();
 
-   private static final byte START_BYTE = (byte) '{';
+   public void usageReset() {
+      referenceCounter.reset();
+   }
 
-   private static final byte END_BYTE = (byte) '}';
+   public int usageUp() {
+      return referenceCounter.increment();
+   }
+
+   public int usageDown() {
+      return referenceCounter.decrement();
+   }
 
 
-   private final int pageId;
+   /** This is an utility method to help you call usageDown while using a try 
(closeable) call.
+    *  */

Review Comment:
   Seems like it could be one line



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -17,57 +17,80 @@
 package org.apache.activemq.artemis.core.paging.impl;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
-import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.collections.EmptyList;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.jboss.logging.Logger;
 
 public final class Page implements Comparable<Page> {
 
+   private static final AtomicInteger factory = new AtomicInteger(0);
+
+   private final int seqInt = factory.incrementAndGet();
+

Review Comment:
   its nice to have the logger at the top and have statics at the top, these 
adds make it static, non-static, static...if you put them below the logger it 
would be nicer.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageReadWriter.java:
##########
@@ -0,0 +1,281 @@
+/**

Review Comment:
   Ditto



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java:
##########
@@ -463,43 +215,40 @@ public void writeDirect(PagedMessage message) throws 
Exception {
       if (!file.isOpen()) {
          throw ActiveMQMessageBundle.BUNDLE.cannotWriteToClosedFile(file);
       }
-      final int messageEncodedSize = message.getEncodeSize();
-      final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
-      final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
-      ChannelBufferWrapper activeMQBuffer = new 
ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
-      activeMQBuffer.clear();
-      activeMQBuffer.writeByte(Page.START_BYTE);
-      activeMQBuffer.writeInt(messageEncodedSize);
-      message.encode(activeMQBuffer);
-      activeMQBuffer.writeByte(Page.END_BYTE);
-      assert (activeMQBuffer.readableBytes() == bufferSize) : 
"messageEncodedSize is different from expected";
-      //buffer limit and position are the same
-      assert (buffer.remaining() == bufferSize) : "buffer position or limit 
are changed";
-      file.writeDirect(buffer, false);
-      if (pageCache != null) {
-         pageCache.addLiveMessage(message);
+      if (callback != null) {
+         callback.accept(message);
       }
-      //lighter than addAndGet when single writer
-      numberOfMessages.lazySet(numberOfMessages.get() + 1);
-      size.lazySet(size.get() + bufferSize);
+      addMessage(message);
+      this.size += PageReadWriter.writeMessage(message, fileFactory, file);
+      numberOfMessages++;
    }
 
    public void sync() throws Exception {
       file.sync();
    }
 
-   public void open(boolean createFile) throws Exception {
+   public boolean isOpen() {
+      return file != null && file.isOpen();
+   }
+
+
+   public boolean open(boolean createFile) throws Exception {
+      boolean isOpen = false;
       if (!file.isOpen() && (createFile || file.exists())) {
          file.open();
+         isOpen = true;
       }
       if (file.isOpen()) {
-         size.set((int) file.size());
+         isOpen = true;
+         size = (int) file.size();

Review Comment:
   Should probably validate the value before truncating it?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -1444,20 +1276,39 @@ public List<MessageReference> 
getRelatedMessageReferences() {
 
    private class CursorIterator implements PageIterator {
 
-      private PagePositionAndFileOffset position = null;
+      // The cursorLogger is declared as static on the class, just to avoid a 
getLogger() call every time
+      private final Logger logger = cursorLogger;

Review Comment:
   Why assign the existing static variable to an instance variable, rather than 
just using or creating the static logger here?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageEventMessage.java:
##########
@@ -54,7 +54,7 @@ public int expectedEncodeSize() {
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeSimpleString(storeName);
-      buffer.writeInt(pageNumber);
+      buffer.writeInt((int)pageNumber);

Review Comment:
   seems like it should at least validate the number before truncating it...and 
then perhaps explode if it would have done the wrong thing?



##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
 `max-size-messages`|The max number of messages the address could have before 
entering on page mode.| -1 (disabled)
 `page-size-bytes`|The size of each page file used on the paging system|10MB
 `address-full-policy`|This must be set to `PAGE` for paging to enable. If the 
value is `PAGE` then further messages will be paged to disk. If the value is 
`DROP` then further messages will be silently dropped. If the value is `FAIL` 
then the messages will be dropped and the client message producers will receive 
an exception. If the value is `BLOCK` then client message producers will block 
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page 
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the 
Queue whenever more messages are needed. The system wtill stop reading if 
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take 
on the Queue whenever more messages are needed. The system will stop reading if 
`max-read-page-messages` hits the limit first.
+`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and 
`max-read-page-bytes` will replace this functionality.

Review Comment:
   Not will replace, but have replaced. Elsewhere I tend to just remove options 
from the docs that dont do anything. Is a message logged if the old one is used?



##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
 `max-size-messages`|The max number of messages the address could have before 
entering on page mode.| -1 (disabled)
 `page-size-bytes`|The size of each page file used on the paging system|10MB
 `address-full-policy`|This must be set to `PAGE` for paging to enable. If the 
value is `PAGE` then further messages will be paged to disk. If the value is 
`DROP` then further messages will be silently dropped. If the value is `FAIL` 
then the messages will be dropped and the client message producers will receive 
an exception. If the value is `BLOCK` then client message producers will block 
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page 
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the 
Queue whenever more messages are needed. The system wtill stop reading if 
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take 
on the Queue whenever more messages are needed. The system will stop reading if 
`max-read-page-messages` hits the limit first.
+`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and 
`max-read-page-bytes` will replace this functionality.
 
 ### max-size-bytes and max-size-messages simultaneous usage
 
 It is possible to define max-size-messages (as the maximum number of messages) 
and max-messages-size (as the max number of estimated memory used by the 
address) concurrently. The configured policy will start based on the first 
value to reach its mark.
 
+#### Maximum read from page
+
+Similarly to `max-size-bytes` and `max-size-messages`, the same can happen 
with `max-read-page-bytes` and `max-read-page-messages` when reading messages 
from paging.

Review Comment:
   The "Similarly to `max-size-bytes` and `max-size-messages` the same can 
happen" bit seems superfluous. It would be cleare to just describe the 
behaviour of these options rather than comparing to ultimately unrelated 
options.



##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderAccessor.java:
##########
@@ -5,20 +5,23 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>

Review Comment:
   This is wrong, its a comment not javadoc, should be left as is.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageReadWriter.java:
##########
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.impl;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.Env;
+import org.jboss.logging.Logger;
+
+public class PageReadWriter {
+
+
+   private static Logger logger = Logger.getLogger(PageReadWriter.class);
+
+   public static final int SIZE_RECORD = DataConstants.SIZE_BYTE + 
DataConstants.SIZE_INT + DataConstants.SIZE_BYTE;
+
+   private static final byte START_BYTE = (byte) '{';
+
+   private static final byte END_BYTE = (byte) '}';
+
+   //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
+   private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 
2;
+   private static final int MINIMUM_MSG_PERSISTENT_SIZE = 
HEADER_AND_TRAILER_SIZE;
+   private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
+   private static final int MIN_CHUNK_SIZE = Env.osPageSize();
+
+   public interface SuspectFileCallback {
+      void onSuspect(String fileName, int position, int msgNumber);
+   }
+
+   public interface PageRecordFilter {
+      boolean skip(ActiveMQBuffer buffer);
+   }
+
+   public interface ReadCallback {
+      void readComple(int size);
+   }
+
+   public static final PageRecordFilter ONLY_LARGE = (buffer) -> 
!PagedMessageImpl.isLargeMessage(buffer);
+
+   public static final PageRecordFilter NO_SKIP = (buffer) -> false;
+
+   public static final PageRecordFilter SKIP_ALL = (buffer) -> true;
+
+   public static int writeMessage(PagedMessage message, SequentialFileFactory 
fileFactory, SequentialFile file) throws Exception {
+      final int messageEncodedSize = message.getEncodeSize();
+      final int bufferSize = messageEncodedSize + SIZE_RECORD;
+      final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
+      ChannelBufferWrapper activeMQBuffer = new 
ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
+      activeMQBuffer.clear();
+      activeMQBuffer.writeByte(START_BYTE);
+      activeMQBuffer.writeInt(messageEncodedSize);
+      message.encode(activeMQBuffer);
+      activeMQBuffer.writeByte(END_BYTE);
+      assert (activeMQBuffer.readableBytes() == bufferSize) : 
"messageEncodedSize is different from expected";
+      //buffer limit and position are the same
+      assert (buffer.remaining() == bufferSize) : "buffer position or limit 
are changed";
+      file.writeDirect(buffer, false);
+      return bufferSize;
+   }
+
+
+

Review Comment:
   Its weird that there are all these newlines between methods, but a little 
below this, there is a 100 line method that is screaming for some newline 
grouping/seperation but has none.



##########
docs/user-manual/en/paging.md:
##########
@@ -96,12 +96,20 @@ Property Name|Description|Default
 `max-size-messages`|The max number of messages the address could have before 
entering on page mode.| -1 (disabled)
 `page-size-bytes`|The size of each page file used on the paging system|10MB
 `address-full-policy`|This must be set to `PAGE` for paging to enable. If the 
value is `PAGE` then further messages will be paged to disk. If the value is 
`DROP` then further messages will be silently dropped. If the value is `FAIL` 
then the messages will be dropped and the client message producers will receive 
an exception. If the value is `BLOCK` then client message producers will block 
when they try and send further messages.|`PAGE`
-`page-max-cache-size`|The system will keep up to `page-max-cache-size` page 
files in memory to optimize IO during paging navigation.|5
+`max-read-page-messages` | how many message can be read from paging into the 
Queue whenever more messages are needed. The system wtill stop reading if 
`max-read-page-bytes hits the limit first.
+`max-read-page-bytes` | how much memory the messages read from paging can take 
on the Queue whenever more messages are needed. The system will stop reading if 
`max-read-page-messages` hits the limit first.
+`page-max-cache-size`|Deprecated and not used: `max-read-page-messages` and 
`max-read-page-bytes` will replace this functionality.
 
 ### max-size-bytes and max-size-messages simultaneous usage
 
 It is possible to define max-size-messages (as the maximum number of messages) 
and max-messages-size (as the max number of estimated memory used by the 
address) concurrently. The configured policy will start based on the first 
value to reach its mark.
 
+#### Maximum read from page
+
+Similarly to `max-size-bytes` and `max-size-messages`, the same can happen 
with `max-read-page-bytes` and `max-read-page-messages` when reading messages 
from paging.
+
+<b>Warning</b>: When messages are read from paging into memory, when they are 
redelivered (for either a rollback or a closed consumers). Messages will stay 
in memory until they are delivered again. In these cases of redeliveries it 
would be expected to have more messages in memory than the configured maximum 
values.

Review Comment:
   The first sentence ends prematurely, the second one is really part of the 
same sentence.



##########
docs/user-manual/en/paging.md:
##########
@@ -192,6 +200,12 @@ The pages are synced periodically and the sync period is 
configured through
 the same value of `journal-buffer-timeout`. When using ASYNCIO, the default
 should be `3333333`.
 
+## Memory usage from Paged Messages.
+
+The system should keep at least one paged file in memory caching ahead reading 
messages. 
+Also every active subscription could keep one paged file in memory. So if your 
system has too many queues (some people would call this an horizontal topology).
+So, if your system has too many queues it is recommended to minimize the 
page-size.

Review Comment:
   third sentence ends prematurely, its really continued by the fourth.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 779139)
    Time Spent: 1.5h  (was: 1h 20m)

> Add Option to read messages into paging based on sizing and eliminate caching
> -----------------------------------------------------------------------------
>
>                 Key: ARTEMIS-3850
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3850
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>    Affects Versions: 2.22.0
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.24.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to