[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906982
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 26/Feb/24 16:01
Start Date: 26/Feb/24 16:01
Worklog Time Spent: 10m 
  Work Description: clebertsuconic merged PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827




Issue Time Tracking
---

Worklog Id: (was: 906982)
Time Spent: 6h  (was: 5h 50m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906941=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906941
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 26/Feb/24 12:43
Start Date: 26/Feb/24 12:43
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on PR #4827:
URL: 
https://github.com/apache/activemq-artemis/pull/4827#issuecomment-1964061636

   I've spent a while more looking at this again and continue to find it fairly 
impenetrable, I'm surrendering on looking at it any further.




Issue Time Tracking
---

Worklog Id: (was: 906941)
Time Spent: 5h 50m  (was: 5h 40m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906524=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906524
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 22:13
Start Date: 22/Feb/24 22:13
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on PR #4827:
URL: 
https://github.com/apache/activemq-artemis/pull/4827#issuecomment-1960404186

   I will actually wait a few more days before merging this.. but I think it's 
ready now.. a lot of test done here.




Issue Time Tracking
---

Worklog Id: (was: 906524)
Time Spent: 5h 40m  (was: 5.5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906522=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906522
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 22:08
Start Date: 22/Feb/24 22:08
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r142844


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -132,6 +138,15 @@ public AMQPMirrorControllerSource(ProtonProtocolManager 
protonProtocolManager, Q
   this.acks = replicaConfig.isMessageAcknowledgements();
   this.brokerConnection = brokerConnection;
   this.sync = replicaConfig.isSync();
+  this.pagedRouteContext = new PagedRouteContext(snfQueue);
+
+  if (sync) {
+ logger.info("Mirror is configured to sync, so pageStore={} being 
enforced to BLOCK, and not page", snfQueue.getName());

Review Comment:
   oops... almost merged this :)





Issue Time Tracking
---

Worklog Id: (was: 906522)
Time Spent: 5.5h  (was: 5h 20m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906521=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906521
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 22:07
Start Date: 22/Feb/24 22:07
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on PR #4827:
URL: 
https://github.com/apache/activemq-artemis/pull/4827#issuecomment-1960397285

   @gemmellr I will merge this now.. ping me if you find anything else later 
please.




Issue Time Tracking
---

Worklog Id: (was: 906521)
Time Spent: 5h 20m  (was: 5h 10m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906508=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906508
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 20:49
Start Date: 22/Feb/24 20:49
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499917788


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }

Review Comment:
   I'm adding an option to enforce a certain page policy. When sync = true, I'm 
enforcing BLOCK, and when sync = false, I'm enforcing PAGE. (No drop or fail 
ever allowed on mirroring).
   
   And if sync we must not page otherwise the CompletionContext is lost through 
paging.





Issue Time Tracking
---

Worklog Id: (was: 906508)
Time Spent: 5h 10m  (was: 5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906492=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906492
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 16:41
Start Date: 22/Feb/24 16:41
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499550612


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }
+
+ if (message.isPaged()) {
+// if the source was paged, we copy the message
+message = copyMessageForPaging(message);
+ }

Review Comment:
   I had to change the ID providers to return the ID from the broker property 
if being set. so.. on that case the ID comes into the right place.
   
   
   I did a lot of testing to make sure I got this right...   a lot of work into 
just that part of the acks from a different queue.. it wasn't easy to balance 
it out.  :)





Issue Time Tracking
---

Worklog Id: (was: 906492)
Time Spent: 5h  (was: 4h 50m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906490=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906490
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 16:20
Start Date: 22/Feb/24 16:20
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499518163


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }
+
+ if (message.isPaged()) {
+// if the source was paged, we copy the message
+message = copyMessageForPaging(message);
+ }

Review Comment:
   ok. I think the 2 if's should cover that quickly in the comments, its not 
obvious now so it definitely wont be later.
   
   Also, thinkingDoes any of this changing the ID stuff affect things in 
terms of acking later, or what happens when the destination broker is itself 
mirroring? E.g I see its setting an 'extra property'.. are those persisted? Or 
passed on if the message isnt reencoded? If the broker restarts before the 
message gets sent, does that change what the representation of this copied 
paged message actually is?





Issue Time Tracking
---

Worklog Id: (was: 906490)
Time Spent: 4h 50m  (was: 4h 40m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906489=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906489
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:55
Start Date: 22/Feb/24 15:55
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499474846


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }

Review Comment:
   If an user misconfigured to throw.. yes.. wrong things can happen.
   
   
   I could log a warn.





Issue Time Tracking
---

Worklog Id: (was: 906489)
Time Spent: 4h 40m  (was: 4.5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906488=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906488
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:46
Start Date: 22/Feb/24 15:46
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499459754


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {
+  final long collectionID;
+  long id;
+  K key;
+  V value;
+
+  MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+  }
+
+  @Override
+  public K getKey() {
+ return key;
+  }
+
+  @Override
+  public V getValue() {
+ return value;
+  }
+
+  @Override
+  public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+  }
+
+  @Override
+  public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+  }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister> persister, byte recordType, 
Supplier completionSupplier, LongFunction contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+  this.collectionId = collectionId;
+  this.journal = journal;
+  this.idGenerator = idGenerator;
+  this.persister = persister;
+  this.recordType = recordType;
+  this.exceptionListener = ioExceptionListener;
+  this.completionSupplier = completionSupplier;
+  this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction contextProvider;
+
+   private final Persister> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map> map = new HashMap<>();
+
+   public long getCollectionId() {
+  return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+  return map.size();
+   }
+
+   public C getContext() {
+  if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+  }
+  return context;
+   }
+
+   public JournalHashMap setContext(C context) {
+  this.context = context;
+  return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+  return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+  return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906487
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:41
Start Date: 22/Feb/24 15:41
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499453054


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }
+
+ if (message.isPaged()) {
+// if the source was paged, we copy the message
+message = copyMessageForPaging(message);
+ }

Review Comment:
   also, when the message is not paged.. I can add a reference into two 
different queues.. However it's paged... it gets messy. so I have to switch the 
message here *only* if either is paged.
   
   
   Notice that the copy if the target is paged will happen in a different place 
(inside the page method where I send a method reference).





Issue Time Tracking
---

Worklog Id: (was: 906487)
Time Spent: 4h 20m  (was: 4h 10m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906486
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:39
Start Date: 22/Feb/24 15:39
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499449021


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }
+
+ if (message.isPaged()) {
+// if the source was paged, we copy the message
+message = copyMessageForPaging(message);
+ }

Review Comment:
   I - large messages
   ii - the acks will now happen in different sources.. the message will be 
paged in the source queue... and not paged in the SNF, and with the same ID.  
so the message must be copied.
   





Issue Time Tracking
---

Worklog Id: (was: 906486)
Time Spent: 4h 10m  (was: 4h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906485
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:38
Start Date: 22/Feb/24 15:38
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499447542


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -424,6 +453,10 @@ public void preAcknowledge(final Transaction tx, final 
MessageReference ref, fin
 
   MirrorController controllerInUse = getControllerInUse();
 
+  if (controllerInUse != null && !controllerInUse.isAllowACK()) {

Review Comment:
   Ohh... I see now.. when you said "this comment" I thought you were referring 
to this:
   
   
https://github.com/apache/activemq-artemis/blob/bc7e4639e0a4665c106f336fb837c33d2cbcd361/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java#L427
   
   
   That log. was already there before, and I see it now.. I am changing it.
   
   
   and I also made an improvement to the code there as well.





Issue Time Tracking
---

Worklog Id: (was: 906485)
Time Spent: 4h  (was: 3h 50m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906482=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906482
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:15
Start Date: 22/Feb/24 15:15
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499410286


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {
+  final long collectionID;
+  long id;
+  K key;
+  V value;
+
+  MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+  }
+
+  @Override
+  public K getKey() {
+ return key;
+  }
+
+  @Override
+  public V getValue() {
+ return value;
+  }
+
+  @Override
+  public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+  }
+
+  @Override
+  public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+  }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister> persister, byte recordType, 
Supplier completionSupplier, LongFunction contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+  this.collectionId = collectionId;
+  this.journal = journal;
+  this.idGenerator = idGenerator;
+  this.persister = persister;
+  this.recordType = recordType;
+  this.exceptionListener = ioExceptionListener;
+  this.completionSupplier = completionSupplier;
+  this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction contextProvider;
+
+   private final Persister> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map> map = new HashMap<>();
+
+   public long getCollectionId() {
+  return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+  return map.size();
+   }
+
+   public C getContext() {
+  if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+  }
+  return context;
+   }
+
+   public JournalHashMap setContext(C context) {
+  this.context = context;
+  return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+  return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+  return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906481=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906481
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:12
Start Date: 22/Feb/24 15:12
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499405901


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import 
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+   // we first retry on the queue a few tiems
+   public static final short MAX_QUEUE_ATTEMPT = 
Short.parseShort(System.getProperty(AckRetry.class.getName() + 
".MAX_QUEUE_ATTEMPT", "5"));

Review Comment:
   > I will rename it back to MIN_QUEUE_ATTEMPTS (will include the S)
   
   Maybe drop the MIN or MAX since its not really either? Just QUEUE_ATTEMPTS?





Issue Time Tracking
---

Worklog Id: (was: 906481)
Time Spent: 3h 40m  (was: 3.5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906478
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:10
Start Date: 22/Feb/24 15:10
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499402585


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import 
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+   // we first retry on the queue a few tiems
+   public static final short MAX_QUEUE_ATTEMPT = 
Short.parseShort(System.getProperty(AckRetry.class.getName() + 
".MAX_QUEUE_ATTEMPT", "5"));

Review Comment:
   > > > Do we retry on the queue after trying paging?
   > 
   > next time paging is retried, I will retry all the pending ACKs on the 
queue again, since I'm already there anyways.
   > 
   
   What happens if we try the queue, but it isnt there so we try paging, but it 
isnt there either, then we try the queue again but it isnt therewe stop 
trying at that point right?
   
   (e.g supposing it was actually in paging when we tried the queue...then was 
in the queue while we tried in paging...then vanishes due to e.g consumption or 
expiration or such like)
   
   





Issue Time Tracking
---

Worklog Id: (was: 906478)
Time Spent: 3.5h  (was: 3h 20m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906477
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 15:08
Start Date: 22/Feb/24 15:08
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499399307


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {
+  final long collectionID;
+  long id;
+  K key;
+  V value;
+
+  MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+  }
+
+  @Override
+  public K getKey() {
+ return key;
+  }
+
+  @Override
+  public V getValue() {
+ return value;
+  }
+
+  @Override
+  public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+  }
+
+  @Override
+  public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+  }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister> persister, byte recordType, 
Supplier completionSupplier, LongFunction contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+  this.collectionId = collectionId;
+  this.journal = journal;
+  this.idGenerator = idGenerator;
+  this.persister = persister;
+  this.recordType = recordType;
+  this.exceptionListener = ioExceptionListener;
+  this.completionSupplier = completionSupplier;
+  this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction contextProvider;
+
+   private final Persister> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map> map = new HashMap<>();
+
+   public long getCollectionId() {
+  return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+  return map.size();
+   }
+
+   public C getContext() {
+  if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+  }
+  return context;
+   }
+
+   public JournalHashMap setContext(C context) {
+  this.context = context;
+  return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+  return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+  return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906473
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:57
Start Date: 22/Feb/24 14:57
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499382695


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import 
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+   // we first retry on the queue a few tiems
+   public static final short MAX_QUEUE_ATTEMPT = 
Short.parseShort(System.getProperty(AckRetry.class.getName() + 
".MAX_QUEUE_ATTEMPT", "5"));

Review Comment:
   I will rename it back to MIN_QUEUE_ATTEMPTS (will include the S)





Issue Time Tracking
---

Worklog Id: (was: 906473)
Time Spent: 3h 10m  (was: 3h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906472
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:57
Start Date: 22/Feb/24 14:57
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499381939


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import 
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+   // we first retry on the queue a few tiems
+   public static final short MAX_QUEUE_ATTEMPT = 
Short.parseShort(System.getProperty(AckRetry.class.getName() + 
".MAX_QUEUE_ATTEMPT", "5"));

Review Comment:
   >> Do we retry on the queue after trying paging?
   
   next time paging is retried, I will retry all the pending ACKs on the queue 
again, since I'm already there anyways.
   
   I couldn't find a better name though other than MAX_QUEUE_ATTEMPT.
   
   I actually used to name this MIN_QUEUE_ATTEMPT at some point.





Issue Time Tracking
---

Worklog Id: (was: 906472)
Time Spent: 3h  (was: 2h 50m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906469=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906469
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:54
Start Date: 22/Feb/24 14:54
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499377931


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -617,4 +650,55 @@ public static void routeMirrorCommand(ActiveMQServer 
server, Message message, Tr
   server.getPostOffice().route(message, ctx, false);
}
 
+   class PagedRouteContext implements RouteContextList {
+
+  private final List listRepresentation;
+  private final List emptyList = Collections.emptyList();
+
+  PagedRouteContext() {
+ listRepresentation = new ArrayList<>(1);
+ listRepresentation.add(snfQueue);

Review Comment:
   I thought it was simple enough.. but I will make a change here.





Issue Time Tracking
---

Worklog Id: (was: 906469)
Time Spent: 2h 50m  (was: 2h 40m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906467
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:47
Start Date: 22/Feb/24 14:47
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499365864


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -424,6 +453,10 @@ public void preAcknowledge(final Transaction tx, final 
MessageReference ref, fin
 
   MirrorController controllerInUse = getControllerInUse();
 
+  if (controllerInUse != null && !controllerInUse.isAllowACK()) {

Review Comment:
   I dont really understand the reply. I was saying it seems weird for this 
"preAcknowledge" method to be logging about "postACKInternalMessage" just above 
this when that is a different method (and on the face of it, suggests a 
different pre vs post timing)





Issue Time Tracking
---

Worklog Id: (was: 906467)
Time Spent: 2h 40m  (was: 2.5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906464=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906464
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:41
Start Date: 22/Feb/24 14:41
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499356875


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }

Review Comment:
   ok...but if it is paging, it seems like this can throw, or delete the 
message, and other stuff...is that expected?





Issue Time Tracking
---

Worklog Id: (was: 906464)
Time Spent: 2.5h  (was: 2h 20m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906462
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:33
Start Date: 22/Feb/24 14:33
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499340484


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {
+  final long collectionID;
+  long id;
+  K key;
+  V value;
+
+  MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+  }
+
+  @Override
+  public K getKey() {
+ return key;
+  }
+
+  @Override
+  public V getValue() {
+ return value;
+  }
+
+  @Override
+  public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+  }
+
+  @Override
+  public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+  }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister> persister, byte recordType, 
Supplier completionSupplier, LongFunction contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+  this.collectionId = collectionId;
+  this.journal = journal;
+  this.idGenerator = idGenerator;
+  this.persister = persister;
+  this.recordType = recordType;
+  this.exceptionListener = ioExceptionListener;
+  this.completionSupplier = completionSupplier;
+  this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction contextProvider;
+
+   private final Persister> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map> map = new HashMap<>();
+
+   public long getCollectionId() {
+  return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+  return map.size();
+   }
+
+   public C getContext() {
+  if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+  }
+  return context;
+   }
+
+   public JournalHashMap setContext(C context) {
+  this.context = context;
+  return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+  return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+  return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906461=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906461
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:32
Start Date: 22/Feb/24 14:32
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499339024


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {

Review Comment:
   The value here is the MapRecord... it can have them called for collisions.





Issue Time Tracking
---

Worklog Id: (was: 906461)
Time Spent: 2h 10m  (was: 2h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906459=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906459
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:21
Start Date: 22/Feb/24 14:21
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499323366


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister implements 
Persister> {
+
+   @Override
+   public byte getID() {
+  return 0;
+   }
+
+   @Override
+   public final int getEncodeSize(JournalHashMap.MapRecord record) {
+  return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning 
in the future
+ DataConstants.SIZE_LONG + // recordID
+ DataConstants.SIZE_LONG + // collectionID
+ getKeySize(record.key) +
+ getValueSize(record.value);
+   }
+
+   protected abstract int getKeySize(K key);
+
+   protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+   protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+   protected abstract int getValueSize(V value);
+
+   protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+   protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+   @Override
+   public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord record) {
+  buffer.writeByte((byte)0); // filler - could be used for versioning in 
the future.
+  buffer.writeLong(record.id);
+  buffer.writeLong(record.collectionID);
+  encodeKey(buffer, record.key);
+  encodeValue(buffer, record.value);
+   }
+
+   @Override
+   public final JournalHashMap.MapRecord decode(ActiveMQBuffer buffer,
+JournalHashMap.MapRecord 
record,
+CoreMessageObjectPools pool) {
+  buffer.readByte(); // filler - not used currently - just in case we ever 
need to version this wiring
+  long id = buffer.readLong();
+  long collectionID = buffer.readLong();
+  K key = decodeKey(buffer);
+  V value = decodeValue(buffer, key);
+
+  JournalHashMap.MapRecord mapRecord = new 
JournalHashMap.MapRecord<>(collectionID, id, key, value);

Review Comment:
   Ok, well that maybe explains why the persister does what it does...but not 
why have the object representation does something slightly different? Would be 
more readable if they were ordered the same.
   
   Placing the version second seems a bit weird. What if we want to change the 
ID type? Didnt you just mentioned the idea of doing that?





Issue Time Tracking
---

Worklog Id: (was: 906459)
Time Spent: 2h  (was: 1h 50m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906454=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906454
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 14:06
Start Date: 22/Feb/24 14:06
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499296963


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -424,6 +453,10 @@ public void preAcknowledge(final Transaction tx, final 
MessageReference ref, fin
 
   MirrorController controllerInUse = getControllerInUse();
 
+  if (controllerInUse != null && !controllerInUse.isAllowACK()) {

Review Comment:
   that comment stays where it is.. I'm adding an extra comment here. 
   
   This is to avoid re-acking the retry.





Issue Time Tracking
---

Worklog Id: (was: 906454)
Time Spent: 1h 50m  (was: 1h 40m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906451
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:55
Start Date: 22/Feb/24 13:55
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499280271


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMapProvider.java:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.persistence.Persister;
+
+public class JournalHashMapProvider {
+
+   final Journal journal;
+   final Persister> persister;
+   final LongObjectHashMap> journalMaps = new 
LongObjectHashMap<>();
+   final LongSupplier idSupplier;
+   final byte recordType;
+   final IOCriticalErrorListener ioExceptionListener;
+   final Supplier ioCompletionSupplier;
+   final LongFunction contextProvider;
+
+   public JournalHashMapProvider(LongSupplier idSupplier, Journal journal, 
AbstractHashMapPersister persister, byte recordType, Supplier 
ioCompletionSupplier, LongFunction contextProvider, IOCriticalErrorListener 
ioExceptionListener) {
+  this.idSupplier = idSupplier;
+  this.persister = persister;
+  this.journal = journal;
+  this.recordType = recordType;
+  this.ioExceptionListener = ioExceptionListener;
+  this.contextProvider = contextProvider;
+  this.ioCompletionSupplier = ioCompletionSupplier;
+   }
+
+   public List> getMaps() {
+  ArrayList> maps = new ArrayList<>();
+  journalMaps.values().forEach(maps::add);
+  return maps;
+   }
+
+   public void clear() {
+  journalMaps.clear();
+   }
+
+   public void reload(RecordInfo recordInfo) {
+  JournalHashMap.MapRecord mapRecord = 
persister.decode(recordInfo.wrapData(), null, null);
+  getMap(mapRecord.collectionID, null).reload(mapRecord);
+   }
+
+   public Iterator> iterMaps() {
+  return journalMaps.values().iterator();
+   }
+
+   public JournalHashMap getMap(long collectionID, C context) {
+  JournalHashMap journalHashMap = journalMaps.get(collectionID);
+  if (journalHashMap == null) {
+ journalHashMap = new JournalHashMap<>(collectionID, journal, 
idSupplier, persister, recordType, ioCompletionSupplier, contextProvider, 
ioExceptionListener).setContext(context);
+ journalMaps.put(collectionID, journalHashMap);
+  }
+  return journalHashMap;
+   }

Review Comment:
   It is from a single thread, but I will add the synchronized.





Issue Time Tracking
---

Worklog Id: (was: 906451)
Time Spent: 1h 40m  (was: 1.5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906450
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:54
Start Date: 22/Feb/24 13:54
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499277765


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {
+  final long collectionID;
+  long id;
+  K key;
+  V value;
+
+  MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+  }
+
+  @Override
+  public K getKey() {
+ return key;
+  }
+
+  @Override
+  public V getValue() {
+ return value;
+  }
+
+  @Override
+  public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+  }
+
+  @Override
+  public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+  }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister> persister, byte recordType, 
Supplier completionSupplier, LongFunction contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+  this.collectionId = collectionId;
+  this.journal = journal;
+  this.idGenerator = idGenerator;
+  this.persister = persister;
+  this.recordType = recordType;
+  this.exceptionListener = ioExceptionListener;
+  this.completionSupplier = completionSupplier;
+  this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction contextProvider;
+
+   private final Persister> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map> map = new HashMap<>();
+
+   public long getCollectionId() {
+  return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+  return map.size();
+   }
+
+   public C getContext() {
+  if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+  }
+  return context;
+   }
+
+   public JournalHashMap setContext(C context) {
+  this.context = context;
+  return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+  return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+  return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906448
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:46
Start Date: 22/Feb/24 13:46
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499266530


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister implements 
Persister> {
+
+   @Override
+   public byte getID() {
+  return 0;
+   }
+
+   @Override
+   public final int getEncodeSize(JournalHashMap.MapRecord record) {
+  return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning 
in the future
+ DataConstants.SIZE_LONG + // recordID
+ DataConstants.SIZE_LONG + // collectionID
+ getKeySize(record.key) +
+ getValueSize(record.value);
+   }
+
+   protected abstract int getKeySize(K key);
+
+   protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+   protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+   protected abstract int getValueSize(V value);
+
+   protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+   protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+   @Override
+   public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord record) {
+  buffer.writeByte((byte)0); // filler - could be used for versioning in 
the future.
+  buffer.writeLong(record.id);
+  buffer.writeLong(record.collectionID);
+  encodeKey(buffer, record.key);
+  encodeValue(buffer, record.value);
+   }
+
+   @Override
+   public final JournalHashMap.MapRecord decode(ActiveMQBuffer buffer,
+JournalHashMap.MapRecord 
record,
+CoreMessageObjectPools pool) {
+  buffer.readByte(); // filler - not used currently - just in case we ever 
need to version this wiring
+  long id = buffer.readLong();
+  long collectionID = buffer.readLong();
+  K key = decodeKey(buffer);
+  V value = decodeValue(buffer, key);
+
+  JournalHashMap.MapRecord mapRecord = new 
JournalHashMap.MapRecord<>(collectionID, id, key, value);

Review Comment:
   all the other persisters have the firs few bytes as the recodID.
   
   
https://github.com/apache/activemq-artemis/blob/f790911c44c4fc830d577debb7163163679670c4/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java#L106
   
   
   
   I will actually place the version bellow the ID to be consistent with the 
others.





Issue Time Tracking
---

Worklog Id: (was: 906448)
Time Spent: 1h 20m  (was: 1h 10m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906444=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906444
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:23
Start Date: 22/Feb/24 13:23
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499236202


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java:
##
@@ -43,6 +43,8 @@ public class AmqpSupport {
public static final int AMQP_CREDITS_DEFAULT = 1000;
public static final int AMQP_LOW_CREDITS_DEFAULT = 300;
 
+   public static final int AMQP_MIRROR_ACK_RETRY_INTERVAL = 10_000;
+

Review Comment:
   nope.. will remove it.. thanks





Issue Time Tracking
---

Worklog Id: (was: 906444)
Time Spent: 1h 10m  (was: 1h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906442
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:22
Start Date: 22/Feb/24 13:22
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499234573


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##
@@ -281,6 +300,15 @@ public void sendMessage(Transaction tx, Message message, 
RoutingContext context)
 return;
  }
 
+ if (snfQueue.getPagingStore().page(message, tx, pagedRouteContext, 
this::copyMessageForPaging)) {
+return;
+ }

Review Comment:
   Before the message would just pile up in the mememory, ignore paging and be 
ot of order.
   
   if not paging ,this is NoOp. If it paged, I must not route.





Issue Time Tracking
---

Worklog Id: (was: 906442)
Time Spent: 1h  (was: 50m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906440
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:20
Start Date: 22/Feb/24 13:20
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499231125


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {
+  final long collectionID;
+  long id;
+  K key;
+  V value;
+
+  MapRecord(long collectionID, long id, K key, V value) {
+ this.collectionID = collectionID;
+ this.id = id;
+ this.key = key;
+ this.value = value;
+  }
+
+  @Override
+  public K getKey() {
+ return key;
+  }
+
+  @Override
+  public V getValue() {
+ return value;
+  }
+
+  @Override
+  public V setValue(V value) {
+ V oldValue = this.value;
+ this.value = value;
+ return oldValue;
+  }
+
+  @Override
+  public String toString() {
+ return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+  }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister> persister, byte recordType, 
Supplier completionSupplier, LongFunction contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+  this.collectionId = collectionId;
+  this.journal = journal;
+  this.idGenerator = idGenerator;
+  this.persister = persister;
+  this.recordType = recordType;
+  this.exceptionListener = ioExceptionListener;
+  this.completionSupplier = completionSupplier;
+  this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction contextProvider;
+
+   private final Persister> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map> map = new HashMap<>();
+
+   public long getCollectionId() {
+  return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+  return map.size();
+   }
+
+   public C getContext() {
+  if (context == null && contextProvider != null) {
+ context = contextProvider.apply(this.collectionId);
+  }
+  return context;
+   }
+
+   public JournalHashMap setContext(C context) {
+  this.context = context;
+  return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+  return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+  return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906433=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906433
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 13:12
Start Date: 22/Feb/24 13:12
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499220752


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap implements Map {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord implements Entry {

Review Comment:
   only the value is being inserted. It will be dead code, right?





Issue Time Tracking
---

Worklog Id: (was: 906433)
Time Spent: 40m  (was: 0.5h)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906431=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906431
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 22/Feb/24 12:50
Start Date: 22/Feb/24 12:50
Worklog Time Spent: 10m 
  Work Description: gemmellr commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499051782


##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public abstract class AbstractHashMapPersister implements 
Persister> {
+
+   @Override
+   public byte getID() {
+  return 0;
+   }
+
+   @Override
+   public final int getEncodeSize(JournalHashMap.MapRecord record) {
+  return DataConstants.SIZE_BYTE + // FILLER, could be used for versioning 
in the future
+ DataConstants.SIZE_LONG + // recordID
+ DataConstants.SIZE_LONG + // collectionID
+ getKeySize(record.key) +
+ getValueSize(record.value);
+   }
+
+   protected abstract int getKeySize(K key);
+
+   protected abstract void encodeKey(ActiveMQBuffer buffer, K key);
+
+   protected abstract K decodeKey(ActiveMQBuffer buffer);
+
+   protected abstract int getValueSize(V value);
+
+   protected abstract void encodeValue(ActiveMQBuffer buffer, V value);
+
+   protected abstract V decodeValue(ActiveMQBuffer buffer, K key);
+
+   @Override
+   public final void encode(ActiveMQBuffer buffer, JournalHashMap.MapRecord record) {
+  buffer.writeByte((byte)0); // filler - could be used for versioning in 
the future.
+  buffer.writeLong(record.id);
+  buffer.writeLong(record.collectionID);
+  encodeKey(buffer, record.key);
+  encodeValue(buffer, record.value);
+   }
+
+   @Override
+   public final JournalHashMap.MapRecord decode(ActiveMQBuffer buffer,
+JournalHashMap.MapRecord 
record,
+CoreMessageObjectPools pool) {
+  buffer.readByte(); // filler - not used currently - just in case we ever 
need to version this wiring
+  long id = buffer.readLong();
+  long collectionID = buffer.readLong();
+  K key = decodeKey(buffer);
+  V value = decodeValue(buffer, key);
+
+  JournalHashMap.MapRecord mapRecord = new 
JournalHashMap.MapRecord<>(collectionID, id, key, value);

Review Comment:
   
   
   Is there a reason the persister favours the 'id' as the first arg, but the 
object representation favours the 'collectionId' as the first?
   
   Would be nice if they were consistent unless there is a reason they shouldnt 
be?



##
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/AbstractHashMapPersister.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906254=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906254
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 21/Feb/24 13:31
Start Date: 21/Feb/24 13:31
Worklog Time Spent: 10m 
  Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1497556085


##
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java:
##
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
+import 
org.apache.activemq.artemis.core.journal.collections.JournalHashMapProvider;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AckManager implements ActiveMQComponent {
+
+   // we first retry on the queue a few tiems
+   public static final short MIN_QUEUE_ATTEMPT = 4;

Review Comment:
   I will rename this to MAX_QUEUE_ATTEMPT
   
   
   and If I'm able to add a configuration element for the AckManager, perhaps I 
could configure these two attributes.





Issue Time Tracking
---

Worklog Id: (was: 906254)
Time Spent: 20m  (was: 10m)

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed 

[jira] [Work logged] (ARTEMIS-4651) Performance improvements on Mirror and Paging

2024-02-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=905949=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-905949
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
---

Author: ASF GitHub Bot
Created on: 20/Feb/24 13:55
Start Date: 20/Feb/24 13:55
Worklog Time Spent: 10m 
  Work Description: clebertsuconic opened a new pull request, #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827

   As part of this PR, I am also including a little PWD command, that I used to 
test this manually.
   
   
   I would have many terminal windows open, and not knowing which terminal was 
connected at which server's. So PWD helped me a lot.
   
   




Issue Time Tracking
---

Worklog Id: (was: 905949)
Remaining Estimate: 0h
Time Spent: 10m

> Performance improvements on Mirror and Paging
> -
>
> Key: ARTEMIS-4651
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
> Project: ActiveMQ Artemis
>  Issue Type: Improvement
>Reporter: Clebert Suconic
>Assignee: Clebert Suconic
>Priority: Major
> Fix For: 2.33.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)