[ 
https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906450&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906450
 ]

ASF GitHub Bot logged work on ARTEMIS-4651:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Feb/24 13:54
            Start Date: 22/Feb/24 13:54
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4827:
URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499277765


##########
artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.journal.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.IOCompletion;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JournalHashMap<K, V, C> implements Map<K, V> {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static class MapRecord<K, V> implements Entry<K, V> {
+      final long collectionID;
+      long id;
+      K key;
+      V value;
+
+      MapRecord(long collectionID, long id, K key, V value) {
+         this.collectionID = collectionID;
+         this.id = id;
+         this.key = key;
+         this.value = value;
+      }
+
+      @Override
+      public K getKey() {
+         return key;
+      }
+
+      @Override
+      public V getValue() {
+         return value;
+      }
+
+      @Override
+      public V setValue(V value) {
+         V oldValue = this.value;
+         this.value = value;
+         return oldValue;
+      }
+
+      @Override
+      public String toString() {
+         return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + 
", key=" + key + ", value=" + value + '}';
+      }
+   }
+
+   public JournalHashMap(long collectionId, Journal journal, LongSupplier 
idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, 
Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, 
IOCriticalErrorListener ioExceptionListener) {
+      this.collectionId = collectionId;
+      this.journal = journal;
+      this.idGenerator = idGenerator;
+      this.persister = persister;
+      this.recordType = recordType;
+      this.exceptionListener = ioExceptionListener;
+      this.completionSupplier = completionSupplier;
+      this.contextProvider = contextProvider;
+   }
+
+   C context;
+
+   LongFunction<C> contextProvider;
+
+   private final Persister<MapRecord<K, V>> persister;
+
+   private final Journal journal;
+
+   private final long collectionId;
+
+   private final byte recordType;
+
+   private final LongSupplier idGenerator;
+
+   private final Supplier<IOCompletion> completionSupplier;
+
+   private final IOCriticalErrorListener exceptionListener;
+
+   private final Map<K, MapRecord<K, V>> map = new HashMap<>();
+
+   public long getCollectionId() {
+      return collectionId;
+   }
+
+   @Override
+   public synchronized int size() {
+      return map.size();
+   }
+
+   public C getContext() {
+      if (context == null && contextProvider != null) {
+         context = contextProvider.apply(this.collectionId);
+      }
+      return context;
+   }
+
+   public JournalHashMap<K, V, C> setContext(C context) {
+      this.context = context;
+      return this;
+   }
+
+   @Override
+   public synchronized boolean isEmpty() {
+      return map.isEmpty();
+   }
+
+   @Override
+   public synchronized boolean containsKey(Object key) {
+      return map.containsKey(key);
+   }
+
+   @Override
+   public synchronized boolean containsValue(Object value) {
+      for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) {
+         if (value.equals(entry.getValue().value)) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   @Override
+   public synchronized V get(Object key) {
+      MapRecord<K, V> reccord = map.get(key);
+      if (reccord == null) {
+         return null;
+      } else {
+         return reccord.value;
+      }
+   }
+
+   /** This is to be called from a single thread during reload, no need to be 
synchronized */
+   public void reload(MapRecord<K, V> reloadValue) {
+      map.put(reloadValue.getKey(), reloadValue);
+   }
+
+   @Override
+   public synchronized V put(K key, V value) {
+      logger.debug("adding {} = {}", key, value);
+      long id = idGenerator.getAsLong();
+      MapRecord<K, V> record = new MapRecord(collectionId, id, key, value);
+      store(key, record);
+      MapRecord<K, V> oldRecord = map.put(key, record);
+
+      if (oldRecord != null) {
+         removed(oldRecord);
+         return oldRecord.value;
+      } else {
+         return null;
+      }
+
+   }
+
+   private synchronized void store(K key, MapRecord<K, V> record) {
+      try {
+         IOCompletion callback = null;
+         if (completionSupplier != null) {
+            callback = completionSupplier.get();
+         }
+
+         if (callback == null) {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
false);
+         } else {
+            journal.appendAddRecord(record.id, recordType, persister, record, 
true, callback);
+         }
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord) {
+      try {
+         journal.appendDeleteRecord(reccord.id, false);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   // callers must be synchronized
+   private void removed(MapRecord reccord, long txid) {
+      try {
+         journal.appendDeleteRecordTransactional(txid, reccord.id);
+      } catch (Exception e) {
+         exceptionListener.onIOException(e, e.getMessage(), null);
+      }
+   }
+
+   @Override
+   public synchronized V remove(Object key) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record);
+      return record.value;
+   }
+
+   /** This method will remove the element from the HashMap immediately 
however the record is still part of a transaction.
+    *  This is not playing with rollbacks. So a rollback on the transaction 
wouldn't place the elements back.
+    *  This is intended to make sure the operation would be atomic in case of 
a failure, while an appendRollback is not expected. */
+   public synchronized V remove(Object key, long transactionID) {
+      MapRecord<K, V> record = map.remove(key);
+      this.removed(record, transactionID);
+      return record.value;
+   }
+
+   @Override
+   public synchronized void putAll(Map<? extends K, ? extends V> m) {
+      m.forEach(this::put);
+   }
+
+   @Override
+   public synchronized void clear() {
+      map.values().forEach(v -> remove(v));
+      map.clear();
+   }
+
+   @Override
+   public synchronized Set<K> keySet() {
+      HashSet<K> keys = new HashSet(map.size());
+      map.values().forEach(v -> keys.add(v.key));
+      return keys;
+   }

Review Comment:
   I willl add a comment.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 906450)
    Time Spent: 1.5h  (was: 1h 20m)

> Performance improvements on Mirror and Paging
> ---------------------------------------------
>
>                 Key: ARTEMIS-4651
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4651
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.33.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Before this change, sends were not paged at the SNF. They are now copied.
> I also added a different scheme for retrying messages in a batches. A 
> collection with pending IDs is created and a few retries are performed at 
> different levels.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to