This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 66c69cffeaf IGNITE-25686 Optimize use ByteBufferCollector for 
appendEntries (#6237)
66c69cffeaf is described below

commit 66c69cffeafbfd262d304f9906f67c890e54635b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jul 14 10:24:50 2025 +0300

    IGNITE-25686 Optimize use ByteBufferCollector for appendEntries (#6237)
---
 .../apache/ignite/raft/jraft/core/Replicator.java  |  29 +++-
 .../ignite/raft/jraft/option/NodeOptions.java      |  24 +++
 .../raft/jraft/option/ReplicatorOptions.java       |   7 +
 .../raft/jraft/util/ByteBufferCollector.java       |  11 +-
 .../raft/jraft/util/ByteBufferCollectorPool.java   |  29 ++++
 .../org/apache/ignite/raft/jraft/util/Utils.java   |   3 +
 ...oncurrentLinkedLifoByteBufferCollectorPool.java |  88 ++++++++++
 ...rrentLinkedLifoByteBufferCollectorPoolTest.java | 191 +++++++++++++++++++++
 8 files changed, 375 insertions(+), 7 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 4e76fb09df3..631c4b75a25 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -67,13 +67,13 @@ import 
org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
-import org.apache.ignite.raft.jraft.util.Recyclable;
 import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList;
 import org.apache.ignite.raft.jraft.util.RecycleUtil;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.ThreadId;
 import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.internal.ThrowUtil;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Replicator for replicating log entry from leader to followers.
@@ -1677,7 +1677,7 @@ public class Replicator implements ThreadId.OnError {
                 return false;
             }
             if (byteBufList.getCapacity() > 0) {
-                dataBuf = 
ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
+                dataBuf = allocateShared(byteBufList.getCapacity());
                 for (final ByteBuffer b : byteBufList) {
                     dataBuf.put(b);
                 }
@@ -1702,7 +1702,7 @@ public class Replicator implements ThreadId.OnError {
         this.statInfo.firstLogIndex = request.prevLogIndex() + 1;
         this.statInfo.lastLogIndex = request.prevLogIndex() + 
Utils.size(request.entriesList());
 
-        final Recyclable recyclable = dataBuf;
+        final ByteBufferCollector releasable = dataBuf;
         final int v = this.version;
         final long monotonicSendTimeMs = Utils.monotonicMs();
         final int seq = getAndIncrementReqSeq();
@@ -1718,7 +1718,7 @@ public class Replicator implements ThreadId.OnError {
                             // TODO: recycle on send success, not response 
received IGNITE-14832.
                             // Also, this closure can be executed when 
rpcFuture was cancelled, but the request was not sent (meaning
                             // it's too early to recycle byte buffer)
-                            RecycleUtil.recycle(recyclable);
+                            releaseShared(releasable);
                         }
                         onRpcReturned(Replicator.this.id, 
RequestType.AppendEntries, status, request, getResponse(),
                             seq, v, monotonicSendTimeMs);
@@ -1726,7 +1726,7 @@ public class Replicator implements ThreadId.OnError {
                 });
         }
         catch (final Throwable t) {
-            RecycleUtil.recycle(recyclable);
+            releaseShared(releasable);
             ThrowUtil.throwException(t);
         }
         addInflight(RequestType.AppendEntries, nextSendingIndex, 
Utils.size(request.entriesList()),
@@ -1944,4 +1944,23 @@ public class Replicator implements ThreadId.OnError {
         this.id.unlock();
     }
 
+    private ByteBufferCollector allocateShared(int size) {
+        ByteBufferCollector collector = 
options.getAppendEntriesByteBufferCollectorPool().borrow();
+
+        if (collector == null || collector.capacity() < size) {
+            // Re-creation is used to avoid race when re-creating the internal 
buffer.
+            // It has been empirically found that adding 20% to the requested 
size reduces the number of allocations.
+            collector = ByteBufferCollector.allocate(Math.max(size, size + 
(int) (size * 0.2)));
+        }
+
+        return collector;
+    }
+
+    private void releaseShared(@Nullable ByteBufferCollector c) {
+        if (c != null && c.capacity() <= 
ByteBufferCollector.MAX_CAPACITY_TO_RECYCLE) {
+            c.clear();
+
+            options.getAppendEntriesByteBufferCollectorPool().release(c);
+        }
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index ab03d9b07e7..02070ffec56 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -37,11 +37,14 @@ import org.apache.ignite.raft.jraft.core.Scheduler;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
 import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
 import org.apache.ignite.raft.jraft.util.Copiable;
 import org.apache.ignite.raft.jraft.util.NoopTimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.Utils;
+import 
org.apache.ignite.raft.jraft.util.concurrent.ConcurrentLinkedLifoByteBufferCollectorPool;
 import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
 import org.apache.ignite.raft.jraft.util.timer.Timer;
 import org.jetbrains.annotations.Nullable;
@@ -301,6 +304,16 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
      */
     private boolean isSystemGroup = false;
 
+    /**
+     * Shared pool of {@link ByteBufferCollector} for sending log entries for 
replication.
+     *
+     * <p>Used to prevent a large number of {@link ByteBufferCollector} from 
being accumulated across all threads that are involved in
+     * sending log entries, see {@link 
ByteBufferCollector#allocateByRecyclers}.</p>
+     */
+    private ByteBufferCollectorPool appendEntriesByteBufferCollectorPool = new 
ConcurrentLinkedLifoByteBufferCollectorPool(
+            Utils.MAX_COLLECTOR_SIZE_PER_SERVER
+    );
+
     public NodeOptions() {
         raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
     }
@@ -779,6 +792,7 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         nodeOptions.setLogStripesCount(this.getLogStripesCount());
         nodeOptions.setLogYieldStrategy(this.isLogYieldStrategy());
         nodeOptions.setNodeManager(this.getNodeManager());
+        
nodeOptions.setAppendEntriesByteBufferCollectorPool(appendEntriesByteBufferCollectorPool);
 
         return nodeOptions;
     }
@@ -835,4 +849,14 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     public void setExternallyEnforcedConfigIndex(@Nullable Long index) {
         this.externallyEnforcedConfigIndex = index;
     }
+
+    /** Returns shared pool of {@link ByteBufferCollector} for sending log 
entries for replication. */
+    public ByteBufferCollectorPool getAppendEntriesByteBufferCollectorPool() {
+        return appendEntriesByteBufferCollectorPool;
+    }
+
+    /** Sets shared pool of {@link ByteBufferCollector} for sending log 
entries for replication. */
+    public void 
setAppendEntriesByteBufferCollectorPool(ByteBufferCollectorPool 
appendEntriesByteBufferCollectorPool) {
+        this.appendEntriesByteBufferCollectorPool = 
appendEntriesByteBufferCollectorPool;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
index 3655daa2f86..c2bc73dec7b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
@@ -26,6 +26,8 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.rpc.RaftClientService;
 import org.apache.ignite.raft.jraft.storage.LogManager;
 import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
 import org.apache.ignite.raft.jraft.util.Copiable;
 
 /**
@@ -221,4 +223,9 @@ public class ReplicatorOptions implements 
Copiable<ReplicatorOptions> {
     public ExecutorService getCommonExecutor() {
         return getNode().getOptions().getCommonExecutor();
     }
+
+    /** Returns shared pool of {@link ByteBufferCollector} for sending log 
entries for replication. */
+    public ByteBufferCollectorPool getAppendEntriesByteBufferCollectorPool() {
+        return 
getNode().getOptions().getAppendEntriesByteBufferCollectorPool();
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
index 3297329e286..d89333ad388 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer;
  * A byte buffer collector that will expand automatically.
  */
 public final class ByteBufferCollector implements Recyclable {
-
-    private static final int MAX_CAPACITY_TO_RECYCLE = 4 * 1024 * 1024; // 4M
+    public static final int MAX_CAPACITY_TO_RECYCLE = 4 * 1024 * 1024; // 4M
 
     private ByteBuffer buffer;
 
@@ -134,6 +133,14 @@ public final class ByteBufferCollector implements 
Recyclable {
         return recyclers.recycle(this, handle);
     }
 
+    public void clear() {
+        ByteBuffer buffer = this.buffer;
+
+        if (buffer != null) {
+            buffer.clear();
+        }
+    }
+
     private transient final Recyclers.Handle handle;
 
     // TODO asch fixme is it safe to have static recyclers ? IGNITE-14832
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorPool.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorPool.java
new file mode 100644
index 00000000000..f6e9039af1a
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorPool.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import org.jetbrains.annotations.Nullable;
+
+/** {@link ByteBufferCollector} pool. */
+public interface ByteBufferCollectorPool {
+    /** Removes and returns the cached {@link ByteBufferCollector}, {@code 
null} if the pool is empty. */
+    @Nullable ByteBufferCollector borrow();
+
+    /** Adds a {@link ByteBufferCollector} to the pool. */
+    void release(ByteBufferCollector c);
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
index 33ab5dd6731..f8d54ed65f0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
@@ -269,6 +269,9 @@ public final class Utils {
     public static final int MAX_COLLECTOR_SIZE_PER_THREAD = 
SystemPropertyUtil.getInt(
         "jraft.max_collector_size_per_thread", 256);
 
+    /** Default max {@link ByteBufferCollector} size per server, it can be set 
by "-Djraft.max_collector_size_per_thread", default 256. */
+    public static final int MAX_COLLECTOR_SIZE_PER_SERVER = 
SystemPropertyUtil.getInt("jraft.max_collector_size_per_server", 256);
+
     /**
      * Expand byte buffer for 1024 bytes.
      */
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPool.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPool.java
new file mode 100644
index 00000000000..b124954c60c
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPool.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Thread safe implementation of a pool based on a fixed-size FIFO linked list.
+ *
+ * <p>{@link ByteBufferCollector} are given out on a last-in, first-out 
basis.</p>
+ */
+public class ConcurrentLinkedLifoByteBufferCollectorPool implements 
ByteBufferCollectorPool {
+    private final int capacity;
+
+    private final AtomicReference<Node> stack = new AtomicReference<>();
+
+    /**
+     * Constructor.
+     *
+     * @param capacity Maximum capacity of the pool, expected to be greater 
than zero.
+     */
+    public ConcurrentLinkedLifoByteBufferCollectorPool(int capacity) {
+        assert capacity > 0 : capacity;
+
+        this.capacity = capacity;
+    }
+
+    /** Removes and returns the last added {@link ByteBufferCollector}, {@code 
null} if the pool is empty. */
+    @Override
+    public @Nullable ByteBufferCollector borrow() {
+        while (true) {
+            Node node = stack.get();
+
+            if (node == null) {
+                return null;
+            } else if (stack.compareAndSet(node, node.next)) {
+                return node.collector;
+            }
+        }
+    }
+
+    /** Adds a {@link ByteBufferCollector} to the pool if and only if it does 
not exceed the capacity. */
+    @Override
+    public void release(ByteBufferCollector c) {
+        while (true) {
+            Node node = stack.get();
+            int newIndex = node == null ? 0 : node.index + 1;
+
+            if (newIndex == capacity) {
+                return;
+            } else if (stack.compareAndSet(node, new Node(c, newIndex, node))) 
{
+                return;
+            }
+        }
+    }
+
+    private static final class Node {
+        private final ByteBufferCollector collector;
+
+        private final int index;
+
+        private final @Nullable Node next;
+
+        private Node(ByteBufferCollector collector, int index, @Nullable Node 
next) {
+            this.collector = collector;
+            this.index = index;
+            this.next = next;
+        }
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java
new file mode 100644
index 00000000000..08b45cdab17
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util.concurrent;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link ConcurrentLinkedLifoByteBufferCollectorPool} testing. */
+public class ConcurrentLinkedLifoByteBufferCollectorPoolTest {
+    private static final int CAPACITY = 10;
+
+    private final ConcurrentLinkedLifoByteBufferCollectorPool collectorPool = 
new ConcurrentLinkedLifoByteBufferCollectorPool(CAPACITY);
+
+    @Test
+    void borrowFromEmptyPool() {
+        assertNull(collectorPool.borrow());
+    }
+
+    @Test
+    void borrowSingleCollector() {
+        ByteBufferCollector collector = allocateCollector();
+
+        collectorPool.release(collector);
+
+        assertSame(collector, collectorPool.borrow());
+        assertNull(collectorPool.borrow());
+    }
+
+    @Test
+    void borrowSameCollector() {
+        ByteBufferCollector collector = allocateCollector();
+
+        collectorPool.release(collector);
+        collectorPool.release(collector);
+
+        assertSame(collector, collectorPool.borrow());
+        assertSame(collector, collectorPool.borrow());
+        assertNull(collectorPool.borrow());
+    }
+
+    @Test
+    void borrowSeveral() {
+        List<ByteBufferCollector> collectors = allocateCollectors(5);
+
+        collectors.forEach(collectorPool::release);
+
+        assertEquals(reverse(collectors), borrowAll());
+    }
+
+    @RepeatedTest(10)
+    void borrowConcurrent() {
+        ByteBufferCollector collector0 = allocateCollector();
+        ByteBufferCollector collector1 = allocateCollector();
+
+        collectorPool.release(collector0);
+        collectorPool.release(collector1);
+
+        Set<ByteBufferCollector> borrows = ConcurrentHashMap.newKeySet();
+
+        IgniteTestUtils.runRace(
+                () -> borrows.add(collectorPool.borrow()),
+                () -> borrows.add(collectorPool.borrow())
+        );
+
+        assertNull(collectorPool.borrow());
+
+        assertThat(borrows, hasSize(2));
+        assertThat(borrows, hasItems(collector0, collector1));
+    }
+
+    @Test
+    void releaseMoreThanCapacity() {
+        List<ByteBufferCollector> collectors = allocateCollectors(CAPACITY + 
5);
+
+        collectors.forEach(collectorPool::release);
+
+        assertEquals(reverse(collectors.subList(0, CAPACITY)), borrowAll());
+    }
+
+    @RepeatedTest(10)
+    void releaseConcurrent() {
+        ByteBufferCollector collector0 = allocateCollector();
+        ByteBufferCollector collector1 = allocateCollector();
+
+        IgniteTestUtils.runRace(
+                () -> collectorPool.release(collector0),
+                () -> collectorPool.release(collector1)
+        );
+
+        List<ByteBufferCollector> borrowAll = borrowAll();
+        assertThat(borrowAll, hasSize(2));
+        assertThat(borrowAll, hasItems(collector0, collector1));
+    }
+
+    @RepeatedTest(10)
+    void borrowReleaseConcurrent() {
+        ByteBufferCollector collector0 = allocateCollector();
+        ByteBufferCollector collector1 = allocateCollector();
+        ByteBufferCollector collector2 = allocateCollector();
+        ByteBufferCollector collector3 = allocateCollector();
+        
+        collectorPool.release(collector0);
+        collectorPool.release(collector1);
+
+        Set<ByteBufferCollector> borrows = ConcurrentHashMap.newKeySet();
+        
+        IgniteTestUtils.runRace(
+                () -> collectorPool.release(collector2),
+                () -> collectorPool.release(collector3),
+                () -> borrows.add(collectorPool.borrow()),
+                () -> borrows.add(collectorPool.borrow())
+        );
+
+        List<ByteBufferCollector> remainingInCollectorPool = borrowAll();
+        
+        assertThat(remainingInCollectorPool, hasSize(2));
+        assertThat(borrows, hasSize(2));
+        
+        assertThat(remainingInCollectorPool, 
not(hasItems(borrows.toArray(ByteBufferCollector[]::new))));
+    }
+
+    private List<ByteBufferCollector> borrowAll() {
+        var res = new ArrayList<ByteBufferCollector>();
+
+        while (true) {
+            ByteBufferCollector collector = collectorPool.borrow();
+
+            if (collector == null) {
+                break;
+            } else {
+                res.add(collector);
+            }
+        }
+
+        return res;
+    }
+
+    private static ByteBufferCollector allocateCollector() {
+        return ByteBufferCollector.allocate(32);
+    }
+
+    private static List<ByteBufferCollector> allocateCollectors(int size) {
+        return IntStream.range(0, size)
+                .mapToObj(i -> allocateCollector())
+                .collect(toList());
+    }
+
+    private static List<ByteBufferCollector> reverse(List<ByteBufferCollector> 
l) {
+        if (l.isEmpty()) {
+            return l;
+        }
+
+        var res = new ArrayList<>(l);
+
+        Collections.reverse(res);
+
+        return res;
+    }
+}

Reply via email to