This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch essobedo/CAMEL-22176/blocking-compression-changes in repository https://gitbox.apache.org/repos/asf/camel.git
commit d3f20358f36b8ebb24a41907c51ecf4584a2176f Author: Nicolas Filotto <[email protected]> AuthorDate: Wed Jun 18 13:19:50 2025 +0200 CAMEL-22176: camel-core - keep compression changes blocking --- .../apache/camel/support/cache/SimpleLRUCache.java | 41 ++++++++++------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index 5c3e79be57f..71793e3e6bf 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -36,7 +36,7 @@ import java.util.function.Function; import java.util.stream.Collectors; /** - * {@code SimpleLRUCache} is a simple implementation of a cache of type Least Recently Used . The implementation doesn't + * {@code SimpleLRUCache} is a simple implementation of a Least Recently Used cache. The implementation doesn't * accept null values. Generally speaking, the parameters of all the public methods must have a value otherwise a * {@code NullPointerException} is thrown. * @@ -47,7 +47,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { static final float DEFAULT_LOAD_FACTOR = 0.75f; /** - * The minimum size of the queue of changes. + * The minimum size of the changes. */ static final int MINIMUM_QUEUE_SIZE = 128; /** @@ -55,7 +55,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { */ private final AtomicBoolean eviction = new AtomicBoolean(); /** - * The lock to prevent the addition of changes during the swap of queue of changes or the cache cleaning. + * The lock to prevent the addition of changes during the swap of changes or the cache cleaning. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); /** @@ -68,7 +68,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { private final AtomicReference<Deque<Entry<K, ValueHolder<V>>>> lastChanges = new AtomicReference<>(new ConcurrentLinkedDeque<>()); /** - * The total amount of changes recorded. + * The total number of changes recorded. */ private final AtomicInteger totalChanges = new AtomicInteger(); /** @@ -339,7 +339,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { } /** - * @return the size of the queue of changes. + * @return the size of the changes. */ int getQueueSize() { return totalChanges.get(); @@ -364,7 +364,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { } /** - * Indicates whether the size of the queue of changes exceeds the maximum allowed size which is the max value + * Indicates whether the size of the changes exceeds the maximum allowed size which is the max value * between {@link #MINIMUM_QUEUE_SIZE} and {@code 2 * maximumCacheSize}. * * @return {@code true} if the queue is full, {@code false} otherwise. @@ -386,28 +386,25 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { * Removes duplicates from the queue of changes if the queue is full. */ private void compressChangesIfNeeded() { - Deque<Entry<K, ValueHolder<V>>> newChanges; - Deque<Entry<K, ValueHolder<V>>> currentChanges; lock.writeLock().lock(); try { if (isQueueFull()) { - newChanges = new ConcurrentLinkedDeque<>(); - totalChanges.set(0); - currentChanges = lastChanges.getAndSet(newChanges); - } else { - return; + Deque<Entry<K, ValueHolder<V>>> newChanges = new ConcurrentLinkedDeque<>(); + Deque<Entry<K, ValueHolder<V>>> currentChanges = lastChanges.getAndSet(newChanges); + Set<K> keys = new HashSet<>(); + Entry<K, ValueHolder<V>> entry; + int changes = 0; + while ((entry = currentChanges.pollLast()) != null) { + if (keys.add(entry.getKey())) { + newChanges.addFirst(entry); + changes++; + } + } + totalChanges.set(changes); } } finally { lock.writeLock().unlock(); } - Set<K> keys = new HashSet<>(); - Entry<K, ValueHolder<V>> entry; - while ((entry = currentChanges.pollLast()) != null) { - if (keys.add(entry.getKey())) { - newChanges.addFirst(entry); - totalChanges.incrementAndGet(); - } - } } private ValueHolder<V> newValue(V value) { @@ -419,7 +416,7 @@ public class SimpleLRUCache<K, V> implements Map<K, V> { } /** - * The internal context of all write operations. + * The internal context of all writes operations. */ private static class OperationContext<K, V> implements AutoCloseable { /**
