This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 66c69cffeaf IGNITE-25686 Optimize use ByteBufferCollector for
appendEntries (#6237)
66c69cffeaf is described below
commit 66c69cffeafbfd262d304f9906f67c890e54635b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jul 14 10:24:50 2025 +0300
IGNITE-25686 Optimize use ByteBufferCollector for appendEntries (#6237)
---
.../apache/ignite/raft/jraft/core/Replicator.java | 29 +++-
.../ignite/raft/jraft/option/NodeOptions.java | 24 +++
.../raft/jraft/option/ReplicatorOptions.java | 7 +
.../raft/jraft/util/ByteBufferCollector.java | 11 +-
.../raft/jraft/util/ByteBufferCollectorPool.java | 29 ++++
.../org/apache/ignite/raft/jraft/util/Utils.java | 3 +
...oncurrentLinkedLifoByteBufferCollectorPool.java | 88 ++++++++++
...rrentLinkedLifoByteBufferCollectorPoolTest.java | 191 +++++++++++++++++++++
8 files changed, 375 insertions(+), 7 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 4e76fb09df3..631c4b75a25 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -67,13 +67,13 @@ import
org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
-import org.apache.ignite.raft.jraft.util.Recyclable;
import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList;
import org.apache.ignite.raft.jraft.util.RecycleUtil;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.ThreadId;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.internal.ThrowUtil;
+import org.jetbrains.annotations.Nullable;
/**
* Replicator for replicating log entry from leader to followers.
@@ -1677,7 +1677,7 @@ public class Replicator implements ThreadId.OnError {
return false;
}
if (byteBufList.getCapacity() > 0) {
- dataBuf =
ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
+ dataBuf = allocateShared(byteBufList.getCapacity());
for (final ByteBuffer b : byteBufList) {
dataBuf.put(b);
}
@@ -1702,7 +1702,7 @@ public class Replicator implements ThreadId.OnError {
this.statInfo.firstLogIndex = request.prevLogIndex() + 1;
this.statInfo.lastLogIndex = request.prevLogIndex() +
Utils.size(request.entriesList());
- final Recyclable recyclable = dataBuf;
+ final ByteBufferCollector releasable = dataBuf;
final int v = this.version;
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
@@ -1718,7 +1718,7 @@ public class Replicator implements ThreadId.OnError {
// TODO: recycle on send success, not response
received IGNITE-14832.
// Also, this closure can be executed when
rpcFuture was cancelled, but the request was not sent (meaning
// it's too early to recycle byte buffer)
- RecycleUtil.recycle(recyclable);
+ releaseShared(releasable);
}
onRpcReturned(Replicator.this.id,
RequestType.AppendEntries, status, request, getResponse(),
seq, v, monotonicSendTimeMs);
@@ -1726,7 +1726,7 @@ public class Replicator implements ThreadId.OnError {
});
}
catch (final Throwable t) {
- RecycleUtil.recycle(recyclable);
+ releaseShared(releasable);
ThrowUtil.throwException(t);
}
addInflight(RequestType.AppendEntries, nextSendingIndex,
Utils.size(request.entriesList()),
@@ -1944,4 +1944,23 @@ public class Replicator implements ThreadId.OnError {
this.id.unlock();
}
+ private ByteBufferCollector allocateShared(int size) {
+ ByteBufferCollector collector =
options.getAppendEntriesByteBufferCollectorPool().borrow();
+
+ if (collector == null || collector.capacity() < size) {
+ // Re-creation is used to avoid race when re-creating the internal
buffer.
+ // It has been empirically found that adding 20% to the requested
size reduces the number of allocations.
+ collector = ByteBufferCollector.allocate(Math.max(size, size +
(int) (size * 0.2)));
+ }
+
+ return collector;
+ }
+
+ private void releaseShared(@Nullable ByteBufferCollector c) {
+ if (c != null && c.capacity() <=
ByteBufferCollector.MAX_CAPACITY_TO_RECYCLE) {
+ c.clear();
+
+ options.getAppendEntriesByteBufferCollectorPool().release(c);
+ }
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index ab03d9b07e7..02070ffec56 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -37,11 +37,14 @@ import org.apache.ignite.raft.jraft.core.Scheduler;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
import org.apache.ignite.raft.jraft.util.Copiable;
import org.apache.ignite.raft.jraft.util.NoopTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
import org.apache.ignite.raft.jraft.util.Utils;
+import
org.apache.ignite.raft.jraft.util.concurrent.ConcurrentLinkedLifoByteBufferCollectorPool;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.raft.jraft.util.timer.Timer;
import org.jetbrains.annotations.Nullable;
@@ -301,6 +304,16 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
*/
private boolean isSystemGroup = false;
+ /**
+ * Shared pool of {@link ByteBufferCollector} for sending log entries for
replication.
+ *
+ * <p>Used to prevent a large number of {@link ByteBufferCollector} from
being accumulated across all threads that are involved in
+ * sending log entries, see {@link
ByteBufferCollector#allocateByRecyclers}.</p>
+ */
+ private ByteBufferCollectorPool appendEntriesByteBufferCollectorPool = new
ConcurrentLinkedLifoByteBufferCollectorPool(
+ Utils.MAX_COLLECTOR_SIZE_PER_SERVER
+ );
+
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
}
@@ -779,6 +792,7 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
nodeOptions.setLogStripesCount(this.getLogStripesCount());
nodeOptions.setLogYieldStrategy(this.isLogYieldStrategy());
nodeOptions.setNodeManager(this.getNodeManager());
+
nodeOptions.setAppendEntriesByteBufferCollectorPool(appendEntriesByteBufferCollectorPool);
return nodeOptions;
}
@@ -835,4 +849,14 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
public void setExternallyEnforcedConfigIndex(@Nullable Long index) {
this.externallyEnforcedConfigIndex = index;
}
+
+ /** Returns shared pool of {@link ByteBufferCollector} for sending log
entries for replication. */
+ public ByteBufferCollectorPool getAppendEntriesByteBufferCollectorPool() {
+ return appendEntriesByteBufferCollectorPool;
+ }
+
+ /** Sets shared pool of {@link ByteBufferCollector} for sending log
entries for replication. */
+ public void
setAppendEntriesByteBufferCollectorPool(ByteBufferCollectorPool
appendEntriesByteBufferCollectorPool) {
+ this.appendEntriesByteBufferCollectorPool =
appendEntriesByteBufferCollectorPool;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
index 3655daa2f86..c2bc73dec7b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReplicatorOptions.java
@@ -26,6 +26,8 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
import org.apache.ignite.raft.jraft.util.Copiable;
/**
@@ -221,4 +223,9 @@ public class ReplicatorOptions implements
Copiable<ReplicatorOptions> {
public ExecutorService getCommonExecutor() {
return getNode().getOptions().getCommonExecutor();
}
+
+ /** Returns shared pool of {@link ByteBufferCollector} for sending log
entries for replication. */
+ public ByteBufferCollectorPool getAppendEntriesByteBufferCollectorPool() {
+ return
getNode().getOptions().getAppendEntriesByteBufferCollectorPool();
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
index 3297329e286..d89333ad388 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollector.java
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer;
* A byte buffer collector that will expand automatically.
*/
public final class ByteBufferCollector implements Recyclable {
-
- private static final int MAX_CAPACITY_TO_RECYCLE = 4 * 1024 * 1024; // 4M
+ public static final int MAX_CAPACITY_TO_RECYCLE = 4 * 1024 * 1024; // 4M
private ByteBuffer buffer;
@@ -134,6 +133,14 @@ public final class ByteBufferCollector implements
Recyclable {
return recyclers.recycle(this, handle);
}
+ public void clear() {
+ ByteBuffer buffer = this.buffer;
+
+ if (buffer != null) {
+ buffer.clear();
+ }
+ }
+
private transient final Recyclers.Handle handle;
// TODO asch fixme is it safe to have static recyclers ? IGNITE-14832
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorPool.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorPool.java
new file mode 100644
index 00000000000..f6e9039af1a
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteBufferCollectorPool.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util;
+
+import org.jetbrains.annotations.Nullable;
+
+/** {@link ByteBufferCollector} pool. */
+public interface ByteBufferCollectorPool {
+ /** Removes and returns the cached {@link ByteBufferCollector}, {@code
null} if the pool is empty. */
+ @Nullable ByteBufferCollector borrow();
+
+ /** Adds a {@link ByteBufferCollector} to the pool. */
+ void release(ByteBufferCollector c);
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
index 33ab5dd6731..f8d54ed65f0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
@@ -269,6 +269,9 @@ public final class Utils {
public static final int MAX_COLLECTOR_SIZE_PER_THREAD =
SystemPropertyUtil.getInt(
"jraft.max_collector_size_per_thread", 256);
+ /** Default max {@link ByteBufferCollector} size per server, it can be set
by "-Djraft.max_collector_size_per_thread", default 256. */
+ public static final int MAX_COLLECTOR_SIZE_PER_SERVER =
SystemPropertyUtil.getInt("jraft.max_collector_size_per_server", 256);
+
/**
* Expand byte buffer for 1024 bytes.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPool.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPool.java
new file mode 100644
index 00000000000..b124954c60c
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPool.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util.concurrent;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollectorPool;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Thread safe implementation of a pool based on a fixed-size FIFO linked list.
+ *
+ * <p>{@link ByteBufferCollector} are given out on a last-in, first-out
basis.</p>
+ */
+public class ConcurrentLinkedLifoByteBufferCollectorPool implements
ByteBufferCollectorPool {
+ private final int capacity;
+
+ private final AtomicReference<Node> stack = new AtomicReference<>();
+
+ /**
+ * Constructor.
+ *
+ * @param capacity Maximum capacity of the pool, expected to be greater
than zero.
+ */
+ public ConcurrentLinkedLifoByteBufferCollectorPool(int capacity) {
+ assert capacity > 0 : capacity;
+
+ this.capacity = capacity;
+ }
+
+ /** Removes and returns the last added {@link ByteBufferCollector}, {@code
null} if the pool is empty. */
+ @Override
+ public @Nullable ByteBufferCollector borrow() {
+ while (true) {
+ Node node = stack.get();
+
+ if (node == null) {
+ return null;
+ } else if (stack.compareAndSet(node, node.next)) {
+ return node.collector;
+ }
+ }
+ }
+
+ /** Adds a {@link ByteBufferCollector} to the pool if and only if it does
not exceed the capacity. */
+ @Override
+ public void release(ByteBufferCollector c) {
+ while (true) {
+ Node node = stack.get();
+ int newIndex = node == null ? 0 : node.index + 1;
+
+ if (newIndex == capacity) {
+ return;
+ } else if (stack.compareAndSet(node, new Node(c, newIndex, node)))
{
+ return;
+ }
+ }
+ }
+
+ private static final class Node {
+ private final ByteBufferCollector collector;
+
+ private final int index;
+
+ private final @Nullable Node next;
+
+ private Node(ByteBufferCollector collector, int index, @Nullable Node
next) {
+ this.collector = collector;
+ this.index = index;
+ this.next = next;
+ }
+ }
+}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java
new file mode 100644
index 00000000000..08b45cdab17
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/util/concurrent/ConcurrentLinkedLifoByteBufferCollectorPoolTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.util.concurrent;
+
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+/** For {@link ConcurrentLinkedLifoByteBufferCollectorPool} testing. */
+public class ConcurrentLinkedLifoByteBufferCollectorPoolTest {
+ private static final int CAPACITY = 10;
+
+ private final ConcurrentLinkedLifoByteBufferCollectorPool collectorPool =
new ConcurrentLinkedLifoByteBufferCollectorPool(CAPACITY);
+
+ @Test
+ void borrowFromEmptyPool() {
+ assertNull(collectorPool.borrow());
+ }
+
+ @Test
+ void borrowSingleCollector() {
+ ByteBufferCollector collector = allocateCollector();
+
+ collectorPool.release(collector);
+
+ assertSame(collector, collectorPool.borrow());
+ assertNull(collectorPool.borrow());
+ }
+
+ @Test
+ void borrowSameCollector() {
+ ByteBufferCollector collector = allocateCollector();
+
+ collectorPool.release(collector);
+ collectorPool.release(collector);
+
+ assertSame(collector, collectorPool.borrow());
+ assertSame(collector, collectorPool.borrow());
+ assertNull(collectorPool.borrow());
+ }
+
+ @Test
+ void borrowSeveral() {
+ List<ByteBufferCollector> collectors = allocateCollectors(5);
+
+ collectors.forEach(collectorPool::release);
+
+ assertEquals(reverse(collectors), borrowAll());
+ }
+
+ @RepeatedTest(10)
+ void borrowConcurrent() {
+ ByteBufferCollector collector0 = allocateCollector();
+ ByteBufferCollector collector1 = allocateCollector();
+
+ collectorPool.release(collector0);
+ collectorPool.release(collector1);
+
+ Set<ByteBufferCollector> borrows = ConcurrentHashMap.newKeySet();
+
+ IgniteTestUtils.runRace(
+ () -> borrows.add(collectorPool.borrow()),
+ () -> borrows.add(collectorPool.borrow())
+ );
+
+ assertNull(collectorPool.borrow());
+
+ assertThat(borrows, hasSize(2));
+ assertThat(borrows, hasItems(collector0, collector1));
+ }
+
+ @Test
+ void releaseMoreThanCapacity() {
+ List<ByteBufferCollector> collectors = allocateCollectors(CAPACITY +
5);
+
+ collectors.forEach(collectorPool::release);
+
+ assertEquals(reverse(collectors.subList(0, CAPACITY)), borrowAll());
+ }
+
+ @RepeatedTest(10)
+ void releaseConcurrent() {
+ ByteBufferCollector collector0 = allocateCollector();
+ ByteBufferCollector collector1 = allocateCollector();
+
+ IgniteTestUtils.runRace(
+ () -> collectorPool.release(collector0),
+ () -> collectorPool.release(collector1)
+ );
+
+ List<ByteBufferCollector> borrowAll = borrowAll();
+ assertThat(borrowAll, hasSize(2));
+ assertThat(borrowAll, hasItems(collector0, collector1));
+ }
+
+ @RepeatedTest(10)
+ void borrowReleaseConcurrent() {
+ ByteBufferCollector collector0 = allocateCollector();
+ ByteBufferCollector collector1 = allocateCollector();
+ ByteBufferCollector collector2 = allocateCollector();
+ ByteBufferCollector collector3 = allocateCollector();
+
+ collectorPool.release(collector0);
+ collectorPool.release(collector1);
+
+ Set<ByteBufferCollector> borrows = ConcurrentHashMap.newKeySet();
+
+ IgniteTestUtils.runRace(
+ () -> collectorPool.release(collector2),
+ () -> collectorPool.release(collector3),
+ () -> borrows.add(collectorPool.borrow()),
+ () -> borrows.add(collectorPool.borrow())
+ );
+
+ List<ByteBufferCollector> remainingInCollectorPool = borrowAll();
+
+ assertThat(remainingInCollectorPool, hasSize(2));
+ assertThat(borrows, hasSize(2));
+
+ assertThat(remainingInCollectorPool,
not(hasItems(borrows.toArray(ByteBufferCollector[]::new))));
+ }
+
+ private List<ByteBufferCollector> borrowAll() {
+ var res = new ArrayList<ByteBufferCollector>();
+
+ while (true) {
+ ByteBufferCollector collector = collectorPool.borrow();
+
+ if (collector == null) {
+ break;
+ } else {
+ res.add(collector);
+ }
+ }
+
+ return res;
+ }
+
+ private static ByteBufferCollector allocateCollector() {
+ return ByteBufferCollector.allocate(32);
+ }
+
+ private static List<ByteBufferCollector> allocateCollectors(int size) {
+ return IntStream.range(0, size)
+ .mapToObj(i -> allocateCollector())
+ .collect(toList());
+ }
+
+ private static List<ByteBufferCollector> reverse(List<ByteBufferCollector>
l) {
+ if (l.isEmpty()) {
+ return l;
+ }
+
+ var res = new ArrayList<>(l);
+
+ Collections.reverse(res);
+
+ return res;
+ }
+}