This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 7620cb89ca IGNITE-21063 Optimize memory usage in optimized marshaller (#2950) 7620cb89ca is described below commit 7620cb89ca88601dd817366dcc6d18c3ee848feb Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Wed Dec 13 16:38:29 2023 +0300 IGNITE-21063 Optimize memory usage in optimized marshaller (#2950) --- .../ignite/internal/index/ItBuildIndexTest.java | 3 +- .../internal/raft/util/DefaultByteBuffersPool.java | 76 ++++++++++++++++++++++ ...edMarshaller.java => EmptyByteBuffersPool.java} | 27 ++------ .../internal/raft/util/OptimizedMarshaller.java | 75 ++++++++++++++++++--- .../raft/util/ThreadLocalOptimizedMarshaller.java | 6 +- .../ignite/internal/raft/RaftGroupServiceTest.java | 3 +- .../raft/util/DefaultByteBuffersPoolTest.java | 66 +++++++++++++++++++ .../raft/util/EmptyByteBuffersPoolTest.java | 34 ++++++++++ .../raftsnapshot/ItTableRaftSnapshotsTest.java | 3 +- .../schema/PartitionCommandsMarshallerImpl.java | 8 +-- .../ThreadLocalPartitionCommandsMarshaller.java | 7 +- .../CheckCatalogVersionOnActionRequestTest.java | 3 +- .../PartitionCommandsMarshallerImplTest.java | 5 +- 13 files changed, 273 insertions(+), 43 deletions(-) diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java index 94e8ba7ecc..291cf607bd 100644 --- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java +++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.index; import static java.util.stream.Collectors.joining; +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL; import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -261,7 +262,7 @@ public class ItBuildIndexTest extends BaseSqlIntegrationTest { ) { IgniteImpl node = CLUSTER.node(0); MessageSerializationRegistry serializationRegistry = node.raftManager().service().serializationRegistry(); - var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry); + var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry, NO_POOL); return (nodeConsistentId, networkMessage) -> { if (networkMessage instanceof WriteActionRequest) { diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPool.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPool.java new file mode 100644 index 0000000000..631816ece6 --- /dev/null +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPool.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.util; + +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.DEFAULT_BUFFER_SIZE; +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.MAX_CACHED_BUFFER_BYTES; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool; +import org.jetbrains.annotations.Nullable; + +/** + * Basic pool implementation with limited capacity. + */ +public class DefaultByteBuffersPool implements ByteBuffersPool { + /** Max pool size. */ + private final int capacity; + + /** Queue with cached buffers. */ + private final Queue<ByteBuffer> queue; + + /** Pool size. */ + private final AtomicInteger size = new AtomicInteger(); + + /** + * Constructor. + * + * @param capacity Max pool size. + */ + public DefaultByteBuffersPool(int capacity) { + this.capacity = capacity; + + queue = new ConcurrentLinkedQueue<>(); + } + + @Override + public @Nullable ByteBuffer borrow() { + ByteBuffer buffer = queue.poll(); + + if (buffer != null) { + return buffer; + } + + if (size.get() < capacity && size.getAndIncrement() < capacity) { + return ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(OptimizedMarshaller.ORDER); + } + + return null; + } + + @Override + public void release(ByteBuffer buffer) { + assert buffer.position() == 0; + assert buffer.capacity() <= MAX_CACHED_BUFFER_BYTES; + + queue.add(buffer); + } +} diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPool.java similarity index 50% copy from modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java copy to modules/raft/src/main/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPool.java index e929125473..4f4e251190 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPool.java @@ -18,32 +18,19 @@ package org.apache.ignite.internal.raft.util; import java.nio.ByteBuffer; -import org.apache.ignite.internal.raft.Marshaller; -import org.apache.ignite.network.serialization.MessageSerializationRegistry; +import org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool; +import org.jetbrains.annotations.Nullable; /** - * Thread-safe variant of {@link OptimizedMarshaller}. + * Cache that's always empty. */ -public class ThreadLocalOptimizedMarshaller implements Marshaller { - /** Thread-local optimized marshaller holder. Not static, because it depends on serialization registry. */ - private final ThreadLocal<Marshaller> marshaller; - - /** - * Constructor. - * - * @param serializationRegistry Serialization registry. - */ - public ThreadLocalOptimizedMarshaller(MessageSerializationRegistry serializationRegistry) { - marshaller = ThreadLocal.withInitial(() -> new OptimizedMarshaller(serializationRegistry)); - } - +public class EmptyByteBuffersPool implements ByteBuffersPool { @Override - public byte[] marshall(Object o) { - return marshaller.get().marshall(o); + public @Nullable ByteBuffer borrow() { + return null; } @Override - public <T> T unmarshall(ByteBuffer bytes) { - return marshaller.get().unmarshall(bytes); + public void release(ByteBuffer buffer) { } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java index cc4609830e..f555679de8 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java @@ -25,26 +25,54 @@ import org.apache.ignite.internal.network.direct.DirectMessageWriter; import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream; import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1; import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.network.NetworkMessage; import org.apache.ignite.network.serialization.MessageReader; import org.apache.ignite.network.serialization.MessageSerializationRegistry; import org.apache.ignite.network.serialization.MessageWriter; +import org.jetbrains.annotations.Nullable; /** * Marshaller implementation that uses a {@link DirectByteBufferStream} variant to serialize/deserialize data. */ public class OptimizedMarshaller implements Marshaller { - /** Protocol version. */ - private static final byte PROTO_VER = 1; + /** + * Byte buffer pool for {@link OptimizedMarshaller}. Helps re-using old buffers, saving some time on allocations. + */ + public interface ByteBuffersPool { + /** + * Removes one buffer from cache and returns it, if possible. Returns {@code null} otherwise. + */ + @Nullable ByteBuffer borrow(); + + /** + * Adds a buffer back to the pool. Should only be called if previous {@link #borrow()} call returned a non-null buffer. + * + * @param buffer The buffer to add back to the pool. Its capacity must not be higher than + * {@link OptimizedMarshaller#MAX_CACHED_BUFFER_BYTES} or the sake of controlling the amount of RAM. If the capacity is higher, + * the behavior is undefined. + */ + void release(ByteBuffer buffer); + } /** Default buffer size. */ - private static final int DEFAULT_BUFFER_SIZE = 1024; + public static final int DEFAULT_BUFFER_SIZE = 1024; + /** Maximal size of the buffer that can be stored in the pool. */ + public static final int MAX_CACHED_BUFFER_BYTES = 256 * 1024; + /** Default "no pool" instance for always-empty pool. */ + public static final ByteBuffersPool NO_POOL = new EmptyByteBuffersPool(); + + /** Protocol version. */ + private static final byte PROTO_VER = 1; /** Byte buffer order. */ - private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN; + public static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN; + + /** Empty array-based byte buffer. Not read-only. */ + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(ArrayUtils.BYTE_EMPTY_ARRAY); - /** Buffer to write data. */ - protected ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ORDER); + /** Pool of byte buffers. */ + private final ByteBuffersPool pool; /** Direct byte-buffer stream instance. */ protected final OptimizedStream stream; @@ -59,10 +87,10 @@ public class OptimizedMarshaller implements Marshaller { * Constructor. * * @param serializationRegistry Serialization registry. + * @param pool Pool of byte buffers. */ - public OptimizedMarshaller(MessageSerializationRegistry serializationRegistry) { - assert buffer.position() == 0; - + public OptimizedMarshaller(MessageSerializationRegistry serializationRegistry, ByteBuffersPool pool) { + this.pool = pool; stream = new OptimizedStream(serializationRegistry); messageWriter = new DirectMessageWriter(serializationRegistry, PROTO_VER) { @@ -88,8 +116,14 @@ public class OptimizedMarshaller implements Marshaller { public byte[] marshall(Object o) { assert o instanceof NetworkMessage; + ByteBuffer poolBuffer = pool.borrow(); + + ByteBuffer buffer = poolBuffer == null ? ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ORDER) : poolBuffer; + NetworkMessage message = (NetworkMessage) o; + beforeWriteMessage(o, buffer); + while (true) { stream.setBuffer(buffer); @@ -100,15 +134,36 @@ public class OptimizedMarshaller implements Marshaller { } buffer = expandBuffer(buffer); + + if (buffer.capacity() <= MAX_CACHED_BUFFER_BYTES && poolBuffer != null) { + poolBuffer = buffer; + } else if (poolBuffer != null) { + poolBuffer.position(0); + pool.release(poolBuffer); + + poolBuffer = null; + } } + // Prevent holding the reference for too long. + stream.setBuffer(EMPTY_BUFFER); + byte[] result = Arrays.copyOf(buffer.array(), buffer.position()); - buffer.position(0); + if (poolBuffer != null) { + poolBuffer.position(0); + pool.release(poolBuffer); + } return result; } + /** + * Invoked on empty buffer, before writing any data to it. + */ + protected void beforeWriteMessage(Object o, ByteBuffer buffer) { + } + @SuppressWarnings("unchecked") @Override public <T> T unmarshall(ByteBuffer bytes) { diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java index e929125473..fe2b0b60eb 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.util; import java.nio.ByteBuffer; import org.apache.ignite.internal.raft.Marshaller; +import org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool; import org.apache.ignite.network.serialization.MessageSerializationRegistry; /** @@ -28,13 +29,16 @@ public class ThreadLocalOptimizedMarshaller implements Marshaller { /** Thread-local optimized marshaller holder. Not static, because it depends on serialization registry. */ private final ThreadLocal<Marshaller> marshaller; + /** Shared pool of byte buffers for all thread-local instances. */ + private final ByteBuffersPool pool = new DefaultByteBuffersPool(Runtime.getRuntime().availableProcessors()); + /** * Constructor. * * @param serializationRegistry Serialization registry. */ public ThreadLocalOptimizedMarshaller(MessageSerializationRegistry serializationRegistry) { - marshaller = ThreadLocal.withInitial(() -> new OptimizedMarshaller(serializationRegistry)); + marshaller = ThreadLocal.withInitial(() -> new OptimizedMarshaller(serializationRegistry, pool)); } @Override diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java index 735b4a7d4f..ae819a3708 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.stream.Collectors.toUnmodifiableList; +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -630,7 +631,7 @@ public class RaftGroupServiceTest extends BaseIgniteAbstractTest { argThat(new ArgumentMatcher<WriteActionRequest>() { @Override public boolean matches(WriteActionRequest arg) { - Object command = new OptimizedMarshaller(cluster.serializationRegistry()).unmarshall(arg.command()); + Object command = new OptimizedMarshaller(cluster.serializationRegistry(), NO_POOL).unmarshall(arg.command()); return command instanceof TestWriteCommand; } diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPoolTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPoolTest.java new file mode 100644 index 0000000000..ac41d0b771 --- /dev/null +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPoolTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import java.nio.ByteBuffer; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link DefaultByteBuffersPool}. + */ +class DefaultByteBuffersPoolTest { + @Test + public void testPoolOfOne() { + var pool = new DefaultByteBuffersPool(1); + + ByteBuffer buffer = pool.borrow(); + + assertNotNull(buffer); + + assertEquals(0, buffer.position()); + assertEquals(OptimizedMarshaller.DEFAULT_BUFFER_SIZE, buffer.capacity()); + + assertNull(pool.borrow()); + + ByteBuffer newBuffer = ByteBuffer.allocate(OptimizedMarshaller.MAX_CACHED_BUFFER_BYTES / 2); + + pool.release(newBuffer); + + assertSame(newBuffer, pool.borrow()); + + assertNull(pool.borrow()); + } + + @Test + public void testPoolOfN() { + int capacity = 10; + + var pool = new DefaultByteBuffersPool(capacity); + + for (int i = 0; i < capacity; i++) { + assertNotNull(pool.borrow()); + } + + assertNull(pool.borrow()); + } +} diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPoolTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPoolTest.java new file mode 100644 index 0000000000..53f4e2cfb2 --- /dev/null +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPoolTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.util; + +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +/** + * Test for {@link EmptyByteBuffersPool}. + */ +class EmptyByteBuffersPoolTest { + @Test + public void testBorrow() { + var pool = new EmptyByteBuffersPool(); + + assertNull(pool.borrow()); + } +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index ed6954c01c..91b9b799f3 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.raftsnapshot; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.SessionUtils.executeUpdate; +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL; import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; @@ -589,7 +590,7 @@ class ItTableRaftSnapshotsTest extends IgniteIntegrationTest { return false; } - var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry); + var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry, NO_POOL); return commandsMarshaller.unmarshall(request.command()) instanceof SafeTimeSyncCommand; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java index 2e3dd47947..8451ab6169 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java @@ -27,20 +27,18 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry; * Default {@link PartitionCommandsMarshaller} implementation. */ public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller implements PartitionCommandsMarshaller { - public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializationRegistry) { - super(serializationRegistry); + public PartitionCommandsMarshallerImpl(MessageSerializationRegistry serializationRegistry, ByteBuffersPool cache) { + super(serializationRegistry, cache); } @Override - public byte[] marshall(Object o) { + protected void beforeWriteMessage(Object o, ByteBuffer buffer) { int requiredCatalogVersion = o instanceof CatalogVersionAware ? ((CatalogVersionAware) o).requiredCatalogVersion() : NO_VERSION_REQUIRED; stream.setBuffer(buffer); stream.writeInt(requiredCatalogVersion); - - return super.marshall(o); } @Override diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java index d3050bd3d2..42a9abc244 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.table.distributed.schema; import java.nio.ByteBuffer; +import org.apache.ignite.internal.raft.util.DefaultByteBuffersPool; +import org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool; import org.apache.ignite.network.serialization.MessageSerializationRegistry; /** @@ -27,13 +29,16 @@ public class ThreadLocalPartitionCommandsMarshaller implements PartitionCommands /** Thread-local optimized marshaller holder. Not static, because it depends on serialization registry. */ private final ThreadLocal<PartitionCommandsMarshaller> marshaller; + /** Shared pool of byte buffers for all thread-local instances. */ + private final ByteBuffersPool pool = new DefaultByteBuffersPool(Runtime.getRuntime().availableProcessors()); + /** * Constructor. * * @param serializationRegistry Serialization registry. */ public ThreadLocalPartitionCommandsMarshaller(MessageSerializationRegistry serializationRegistry) { - marshaller = ThreadLocal.withInitial(() -> new PartitionCommandsMarshallerImpl(serializationRegistry)); + marshaller = ThreadLocal.withInitial(() -> new PartitionCommandsMarshallerImpl(serializationRegistry, pool)); } @Override diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java index b6c3c8e073..e337671563 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.schema; +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL; import static org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; @@ -95,7 +96,7 @@ class CheckCatalogVersionOnActionRequestTest extends BaseIgniteAbstractTest { lenient().when(node.getNodeState()).thenReturn(State.STATE_LEADER); lenient().when(node.getLeaderId()).thenReturn(leaderId); - commandsMarshaller = new PartitionCommandsMarshallerImpl(defaultSerializationRegistry()); + commandsMarshaller = new PartitionCommandsMarshallerImpl(defaultSerializationRegistry(), NO_POOL); } @Test diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java index 5f31c1a1f9..cfc4691141 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.schema; +import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -61,9 +62,9 @@ class PartitionCommandsMarshallerImplTest { ); } - private final OptimizedMarshaller standardMarshaller = new OptimizedMarshaller(registry); + private final OptimizedMarshaller standardMarshaller = new OptimizedMarshaller(registry, NO_POOL); - private final PartitionCommandsMarshallerImpl partitionCommandsMarshaller = new PartitionCommandsMarshallerImpl(registry); + private final PartitionCommandsMarshallerImpl partitionCommandsMarshaller = new PartitionCommandsMarshallerImpl(registry, NO_POOL); @Test void marshalPrependsWithZeroForNotCatalogLevelAware() {