[ https://issues.apache.org/jira/browse/ARTEMIS-4651?focusedWorklogId=906450&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-906450 ]
ASF GitHub Bot logged work on ARTEMIS-4651: ------------------------------------------- Author: ASF GitHub Bot Created on: 22/Feb/24 13:54 Start Date: 22/Feb/24 13:54 Worklog Time Spent: 10m Work Description: clebertsuconic commented on code in PR #4827: URL: https://github.com/apache/activemq-artemis/pull/4827#discussion_r1499277765 ########## artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.journal.collections; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.LongFunction; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.journal.IOCompletion; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JournalHashMap<K, V, C> implements Map<K, V> { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static class MapRecord<K, V> implements Entry<K, V> { + final long collectionID; + long id; + K key; + V value; + + MapRecord(long collectionID, long id, K key, V value) { + this.collectionID = collectionID; + this.id = id; + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + V oldValue = this.value; + this.value = value; + return oldValue; + } + + @Override + public String toString() { + return "MapRecord{" + "collectionID=" + collectionID + ", id=" + id + ", key=" + key + ", value=" + value + '}'; + } + } + + public JournalHashMap(long collectionId, Journal journal, LongSupplier idGenerator, Persister<MapRecord<K, V>> persister, byte recordType, Supplier<IOCompletion> completionSupplier, LongFunction<C> contextProvider, IOCriticalErrorListener ioExceptionListener) { + this.collectionId = collectionId; + this.journal = journal; + this.idGenerator = idGenerator; + this.persister = persister; + this.recordType = recordType; + this.exceptionListener = ioExceptionListener; + this.completionSupplier = completionSupplier; + this.contextProvider = contextProvider; + } + + C context; + + LongFunction<C> contextProvider; + + private final Persister<MapRecord<K, V>> persister; + + private final Journal journal; + + private final long collectionId; + + private final byte recordType; + + private final LongSupplier idGenerator; + + private final Supplier<IOCompletion> completionSupplier; + + private final IOCriticalErrorListener exceptionListener; + + private final Map<K, MapRecord<K, V>> map = new HashMap<>(); + + public long getCollectionId() { + return collectionId; + } + + @Override + public synchronized int size() { + return map.size(); + } + + public C getContext() { + if (context == null && contextProvider != null) { + context = contextProvider.apply(this.collectionId); + } + return context; + } + + public JournalHashMap<K, V, C> setContext(C context) { + this.context = context; + return this; + } + + @Override + public synchronized boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public synchronized boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public synchronized boolean containsValue(Object value) { + for (Entry<K, MapRecord<K, V>> entry : map.entrySet()) { + if (value.equals(entry.getValue().value)) { + return true; + } + } + return false; + } + + @Override + public synchronized V get(Object key) { + MapRecord<K, V> reccord = map.get(key); + if (reccord == null) { + return null; + } else { + return reccord.value; + } + } + + /** This is to be called from a single thread during reload, no need to be synchronized */ + public void reload(MapRecord<K, V> reloadValue) { + map.put(reloadValue.getKey(), reloadValue); + } + + @Override + public synchronized V put(K key, V value) { + logger.debug("adding {} = {}", key, value); + long id = idGenerator.getAsLong(); + MapRecord<K, V> record = new MapRecord(collectionId, id, key, value); + store(key, record); + MapRecord<K, V> oldRecord = map.put(key, record); + + if (oldRecord != null) { + removed(oldRecord); + return oldRecord.value; + } else { + return null; + } + + } + + private synchronized void store(K key, MapRecord<K, V> record) { + try { + IOCompletion callback = null; + if (completionSupplier != null) { + callback = completionSupplier.get(); + } + + if (callback == null) { + journal.appendAddRecord(record.id, recordType, persister, record, false); + } else { + journal.appendAddRecord(record.id, recordType, persister, record, true, callback); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + exceptionListener.onIOException(e, e.getMessage(), null); + } + } + + // callers must be synchronized + private void removed(MapRecord reccord) { + try { + journal.appendDeleteRecord(reccord.id, false); + } catch (Exception e) { + exceptionListener.onIOException(e, e.getMessage(), null); + } + } + + // callers must be synchronized + private void removed(MapRecord reccord, long txid) { + try { + journal.appendDeleteRecordTransactional(txid, reccord.id); + } catch (Exception e) { + exceptionListener.onIOException(e, e.getMessage(), null); + } + } + + @Override + public synchronized V remove(Object key) { + MapRecord<K, V> record = map.remove(key); + this.removed(record); + return record.value; + } + + /** This method will remove the element from the HashMap immediately however the record is still part of a transaction. + * This is not playing with rollbacks. So a rollback on the transaction wouldn't place the elements back. + * This is intended to make sure the operation would be atomic in case of a failure, while an appendRollback is not expected. */ + public synchronized V remove(Object key, long transactionID) { + MapRecord<K, V> record = map.remove(key); + this.removed(record, transactionID); + return record.value; + } + + @Override + public synchronized void putAll(Map<? extends K, ? extends V> m) { + m.forEach(this::put); + } + + @Override + public synchronized void clear() { + map.values().forEach(v -> remove(v)); + map.clear(); + } + + @Override + public synchronized Set<K> keySet() { + HashSet<K> keys = new HashSet(map.size()); + map.values().forEach(v -> keys.add(v.key)); + return keys; + } Review Comment: I willl add a comment. Issue Time Tracking ------------------- Worklog Id: (was: 906450) Time Spent: 1.5h (was: 1h 20m) > Performance improvements on Mirror and Paging > --------------------------------------------- > > Key: ARTEMIS-4651 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4651 > Project: ActiveMQ Artemis > Issue Type: Improvement > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.33.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Before this change, sends were not paged at the SNF. They are now copied. > I also added a different scheme for retrying messages in a batches. A > collection with pending IDs is created and a few retries are performed at > different levels. -- This message was sent by Atlassian Jira (v8.20.10#820010)