This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7620cb89ca IGNITE-21063 Optimize memory usage in optimized marshaller 
(#2950)
7620cb89ca is described below

commit 7620cb89ca88601dd817366dcc6d18c3ee848feb
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Wed Dec 13 16:38:29 2023 +0300

    IGNITE-21063 Optimize memory usage in optimized marshaller (#2950)
---
 .../ignite/internal/index/ItBuildIndexTest.java    |  3 +-
 .../internal/raft/util/DefaultByteBuffersPool.java | 76 ++++++++++++++++++++++
 ...edMarshaller.java => EmptyByteBuffersPool.java} | 27 ++------
 .../internal/raft/util/OptimizedMarshaller.java    | 75 ++++++++++++++++++---
 .../raft/util/ThreadLocalOptimizedMarshaller.java  |  6 +-
 .../ignite/internal/raft/RaftGroupServiceTest.java |  3 +-
 .../raft/util/DefaultByteBuffersPoolTest.java      | 66 +++++++++++++++++++
 .../raft/util/EmptyByteBuffersPoolTest.java        | 34 ++++++++++
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  3 +-
 .../schema/PartitionCommandsMarshallerImpl.java    |  8 +--
 .../ThreadLocalPartitionCommandsMarshaller.java    |  7 +-
 .../CheckCatalogVersionOnActionRequestTest.java    |  3 +-
 .../PartitionCommandsMarshallerImplTest.java       |  5 +-
 13 files changed, 273 insertions(+), 43 deletions(-)

diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index 94e8ba7ecc..291cf607bd 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.index;
 
 import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -261,7 +262,7 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
     ) {
         IgniteImpl node = CLUSTER.node(0);
         MessageSerializationRegistry serializationRegistry = 
node.raftManager().service().serializationRegistry();
-        var commandsMarshaller = new 
PartitionCommandsMarshallerImpl(serializationRegistry);
+        var commandsMarshaller = new 
PartitionCommandsMarshallerImpl(serializationRegistry, NO_POOL);
 
         return (nodeConsistentId, networkMessage) -> {
             if (networkMessage instanceof WriteActionRequest) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPool.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPool.java
new file mode 100644
index 0000000000..631816ece6
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPool.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import static 
org.apache.ignite.internal.raft.util.OptimizedMarshaller.DEFAULT_BUFFER_SIZE;
+import static 
org.apache.ignite.internal.raft.util.OptimizedMarshaller.MAX_CACHED_BUFFER_BYTES;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import 
org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Basic pool implementation with limited capacity.
+ */
+public class DefaultByteBuffersPool implements ByteBuffersPool {
+    /** Max pool size. */
+    private final int capacity;
+
+    /** Queue with cached buffers. */
+    private final Queue<ByteBuffer> queue;
+
+    /** Pool size. */
+    private final AtomicInteger size = new AtomicInteger();
+
+    /**
+     * Constructor.
+     *
+     * @param capacity Max pool size.
+     */
+    public DefaultByteBuffersPool(int capacity) {
+        this.capacity = capacity;
+
+        queue = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public @Nullable ByteBuffer borrow() {
+        ByteBuffer buffer = queue.poll();
+
+        if (buffer != null) {
+            return buffer;
+        }
+
+        if (size.get() < capacity && size.getAndIncrement() < capacity) {
+            return 
ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(OptimizedMarshaller.ORDER);
+        }
+
+        return null;
+    }
+
+    @Override
+    public void release(ByteBuffer buffer) {
+        assert buffer.position() == 0;
+        assert buffer.capacity() <= MAX_CACHED_BUFFER_BYTES;
+
+        queue.add(buffer);
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPool.java
similarity index 50%
copy from 
modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
copy to 
modules/raft/src/main/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPool.java
index e929125473..4f4e251190 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPool.java
@@ -18,32 +18,19 @@
 package org.apache.ignite.internal.raft.util;
 
 import java.nio.ByteBuffer;
-import org.apache.ignite.internal.raft.Marshaller;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import 
org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Thread-safe variant of {@link OptimizedMarshaller}.
+ * Cache that's always empty.
  */
-public class ThreadLocalOptimizedMarshaller implements Marshaller {
-    /** Thread-local optimized marshaller holder. Not static, because it 
depends on serialization registry. */
-    private final ThreadLocal<Marshaller> marshaller;
-
-    /**
-     * Constructor.
-     *
-     * @param serializationRegistry Serialization registry.
-     */
-    public ThreadLocalOptimizedMarshaller(MessageSerializationRegistry 
serializationRegistry) {
-        marshaller = ThreadLocal.withInitial(() -> new 
OptimizedMarshaller(serializationRegistry));
-    }
-
+public class EmptyByteBuffersPool implements ByteBuffersPool {
     @Override
-    public byte[] marshall(Object o) {
-        return marshaller.get().marshall(o);
+    public @Nullable ByteBuffer borrow() {
+        return null;
     }
 
     @Override
-    public <T> T unmarshall(ByteBuffer bytes) {
-        return marshaller.get().unmarshall(bytes);
+    public void release(ByteBuffer buffer) {
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
index cc4609830e..f555679de8 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
@@ -25,26 +25,54 @@ import 
org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
 import 
org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
 import org.apache.ignite.internal.raft.Marshaller;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageReader;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Marshaller implementation that uses a {@link DirectByteBufferStream} 
variant to serialize/deserialize data.
  */
 public class OptimizedMarshaller implements Marshaller {
-    /** Protocol version. */
-    private static final byte PROTO_VER = 1;
+    /**
+     * Byte buffer pool for {@link OptimizedMarshaller}. Helps re-using old 
buffers, saving some time on allocations.
+     */
+    public interface ByteBuffersPool {
+        /**
+         * Removes one buffer from cache and returns it, if possible. Returns 
{@code null} otherwise.
+         */
+        @Nullable ByteBuffer borrow();
+
+        /**
+         * Adds a buffer back to the pool. Should only be called if previous 
{@link #borrow()} call returned a non-null buffer.
+         *
+         * @param buffer The buffer to add back to the pool. Its capacity must 
not be higher than
+         *      {@link OptimizedMarshaller#MAX_CACHED_BUFFER_BYTES} or the 
sake of controlling the amount of RAM. If the capacity is higher,
+         *      the behavior is undefined.
+         */
+        void release(ByteBuffer buffer);
+    }
 
     /** Default buffer size. */
-    private static final int DEFAULT_BUFFER_SIZE = 1024;
+    public static final int DEFAULT_BUFFER_SIZE = 1024;
+    /** Maximal size of the buffer that can be stored in the pool. */
+    public static final int MAX_CACHED_BUFFER_BYTES = 256 * 1024;
+    /** Default "no pool" instance for always-empty pool. */
+    public static final ByteBuffersPool NO_POOL = new EmptyByteBuffersPool();
+
+    /** Protocol version. */
+    private static final byte PROTO_VER = 1;
 
     /** Byte buffer order. */
-    private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+    public static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
+    /** Empty array-based byte buffer. Not read-only. */
+    private static final ByteBuffer EMPTY_BUFFER = 
ByteBuffer.wrap(ArrayUtils.BYTE_EMPTY_ARRAY);
 
-    /** Buffer to write data. */
-    protected ByteBuffer buffer = 
ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ORDER);
+    /** Pool of byte buffers. */
+    private final ByteBuffersPool pool;
 
     /** Direct byte-buffer stream instance. */
     protected final OptimizedStream stream;
@@ -59,10 +87,10 @@ public class OptimizedMarshaller implements Marshaller {
      * Constructor.
      *
      * @param serializationRegistry Serialization registry.
+     * @param pool Pool of byte buffers.
      */
-    public OptimizedMarshaller(MessageSerializationRegistry 
serializationRegistry) {
-        assert buffer.position() == 0;
-
+    public OptimizedMarshaller(MessageSerializationRegistry 
serializationRegistry, ByteBuffersPool pool) {
+        this.pool = pool;
         stream = new OptimizedStream(serializationRegistry);
 
         messageWriter = new DirectMessageWriter(serializationRegistry, 
PROTO_VER) {
@@ -88,8 +116,14 @@ public class OptimizedMarshaller implements Marshaller {
     public byte[] marshall(Object o) {
         assert o instanceof NetworkMessage;
 
+        ByteBuffer poolBuffer = pool.borrow();
+
+        ByteBuffer buffer = poolBuffer == null ? 
ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ORDER) : poolBuffer;
+
         NetworkMessage message = (NetworkMessage) o;
 
+        beforeWriteMessage(o, buffer);
+
         while (true) {
             stream.setBuffer(buffer);
 
@@ -100,15 +134,36 @@ public class OptimizedMarshaller implements Marshaller {
             }
 
             buffer = expandBuffer(buffer);
+
+            if (buffer.capacity() <= MAX_CACHED_BUFFER_BYTES && poolBuffer != 
null) {
+                poolBuffer = buffer;
+            } else if (poolBuffer != null) {
+                poolBuffer.position(0);
+                pool.release(poolBuffer);
+
+                poolBuffer = null;
+            }
         }
 
+        // Prevent holding the reference for too long.
+        stream.setBuffer(EMPTY_BUFFER);
+
         byte[] result = Arrays.copyOf(buffer.array(), buffer.position());
 
-        buffer.position(0);
+        if (poolBuffer != null) {
+            poolBuffer.position(0);
+            pool.release(poolBuffer);
+        }
 
         return result;
     }
 
+    /**
+     * Invoked on empty buffer, before writing any data to it.
+     */
+    protected void beforeWriteMessage(Object o, ByteBuffer buffer) {
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <T> T unmarshall(ByteBuffer bytes) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
index e929125473..fe2b0b60eb 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.util;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.raft.Marshaller;
+import 
org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
@@ -28,13 +29,16 @@ public class ThreadLocalOptimizedMarshaller implements 
Marshaller {
     /** Thread-local optimized marshaller holder. Not static, because it 
depends on serialization registry. */
     private final ThreadLocal<Marshaller> marshaller;
 
+    /** Shared pool of byte buffers for all thread-local instances. */
+    private final ByteBuffersPool pool = new 
DefaultByteBuffersPool(Runtime.getRuntime().availableProcessors());
+
     /**
      * Constructor.
      *
      * @param serializationRegistry Serialization registry.
      */
     public ThreadLocalOptimizedMarshaller(MessageSerializationRegistry 
serializationRegistry) {
-        marshaller = ThreadLocal.withInitial(() -> new 
OptimizedMarshaller(serializationRegistry));
+        marshaller = ThreadLocal.withInitial(() -> new 
OptimizedMarshaller(serializationRegistry, pool));
     }
 
     @Override
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
index 735b4a7d4f..ae819a3708 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/RaftGroupServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -630,7 +631,7 @@ public class RaftGroupServiceTest extends 
BaseIgniteAbstractTest {
                 argThat(new ArgumentMatcher<WriteActionRequest>() {
                     @Override
                     public boolean matches(WriteActionRequest arg) {
-                        Object command = new 
OptimizedMarshaller(cluster.serializationRegistry()).unmarshall(arg.command());
+                        Object command = new 
OptimizedMarshaller(cluster.serializationRegistry(), 
NO_POOL).unmarshall(arg.command());
 
                         return command instanceof TestWriteCommand;
                     }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPoolTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPoolTest.java
new file mode 100644
index 0000000000..ac41d0b771
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/DefaultByteBuffersPoolTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+import java.nio.ByteBuffer;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link DefaultByteBuffersPool}.
+ */
+class DefaultByteBuffersPoolTest {
+    @Test
+    public void testPoolOfOne() {
+        var pool = new DefaultByteBuffersPool(1);
+
+        ByteBuffer buffer = pool.borrow();
+
+        assertNotNull(buffer);
+
+        assertEquals(0, buffer.position());
+        assertEquals(OptimizedMarshaller.DEFAULT_BUFFER_SIZE, 
buffer.capacity());
+
+        assertNull(pool.borrow());
+
+        ByteBuffer newBuffer = 
ByteBuffer.allocate(OptimizedMarshaller.MAX_CACHED_BUFFER_BYTES / 2);
+
+        pool.release(newBuffer);
+
+        assertSame(newBuffer, pool.borrow());
+
+        assertNull(pool.borrow());
+    }
+
+    @Test
+    public void testPoolOfN() {
+        int capacity = 10;
+
+        var pool = new DefaultByteBuffersPool(capacity);
+
+        for (int i = 0; i < capacity; i++) {
+            assertNotNull(pool.borrow());
+        }
+
+        assertNull(pool.borrow());
+    }
+}
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPoolTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPoolTest.java
new file mode 100644
index 0000000000..53f4e2cfb2
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/EmptyByteBuffersPoolTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for {@link EmptyByteBuffersPool}.
+ */
+class EmptyByteBuffersPoolTest {
+    @Test
+    public void testBorrow() {
+        var pool = new EmptyByteBuffersPool();
+
+        assertNull(pool.borrow());
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index ed6954c01c..91b9b799f3 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raftsnapshot;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
@@ -589,7 +590,7 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
             return false;
         }
 
-        var commandsMarshaller = new 
PartitionCommandsMarshallerImpl(serializationRegistry);
+        var commandsMarshaller = new 
PartitionCommandsMarshallerImpl(serializationRegistry, NO_POOL);
         return commandsMarshaller.unmarshall(request.command()) instanceof 
SafeTimeSyncCommand;
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
index 2e3dd47947..8451ab6169 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImpl.java
@@ -27,20 +27,18 @@ import 
org.apache.ignite.network.serialization.MessageSerializationRegistry;
  * Default {@link PartitionCommandsMarshaller} implementation.
  */
 public class PartitionCommandsMarshallerImpl extends OptimizedMarshaller 
implements PartitionCommandsMarshaller {
-    public PartitionCommandsMarshallerImpl(MessageSerializationRegistry 
serializationRegistry) {
-        super(serializationRegistry);
+    public PartitionCommandsMarshallerImpl(MessageSerializationRegistry 
serializationRegistry, ByteBuffersPool cache) {
+        super(serializationRegistry, cache);
     }
 
     @Override
-    public byte[] marshall(Object o) {
+    protected void beforeWriteMessage(Object o, ByteBuffer buffer) {
         int requiredCatalogVersion = o instanceof CatalogVersionAware
                 ? ((CatalogVersionAware) o).requiredCatalogVersion()
                 : NO_VERSION_REQUIRED;
 
         stream.setBuffer(buffer);
         stream.writeInt(requiredCatalogVersion);
-
-        return super.marshall(o);
     }
 
     @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java
index d3050bd3d2..42a9abc244 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ThreadLocalPartitionCommandsMarshaller.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.table.distributed.schema;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.raft.util.DefaultByteBuffersPool;
+import 
org.apache.ignite.internal.raft.util.OptimizedMarshaller.ByteBuffersPool;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
@@ -27,13 +29,16 @@ public class ThreadLocalPartitionCommandsMarshaller 
implements PartitionCommands
     /** Thread-local optimized marshaller holder. Not static, because it 
depends on serialization registry. */
     private final ThreadLocal<PartitionCommandsMarshaller> marshaller;
 
+    /** Shared pool of byte buffers for all thread-local instances. */
+    private final ByteBuffersPool pool = new 
DefaultByteBuffersPool(Runtime.getRuntime().availableProcessors());
+
     /**
      * Constructor.
      *
      * @param serializationRegistry Serialization registry.
      */
     public ThreadLocalPartitionCommandsMarshaller(MessageSerializationRegistry 
serializationRegistry) {
-        marshaller = ThreadLocal.withInitial(() -> new 
PartitionCommandsMarshallerImpl(serializationRegistry));
+        marshaller = ThreadLocal.withInitial(() -> new 
PartitionCommandsMarshallerImpl(serializationRegistry, pool));
     }
 
     @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
index b6c3c8e073..e337671563 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.schema;
 
+import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
@@ -95,7 +96,7 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
         lenient().when(node.getNodeState()).thenReturn(State.STATE_LEADER);
         lenient().when(node.getLeaderId()).thenReturn(leaderId);
 
-        commandsMarshaller = new 
PartitionCommandsMarshallerImpl(defaultSerializationRegistry());
+        commandsMarshaller = new 
PartitionCommandsMarshallerImpl(defaultSerializationRegistry(), NO_POOL);
     }
 
     @Test
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
index 5f31c1a1f9..cfc4691141 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.schema;
 
+import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -61,9 +62,9 @@ class PartitionCommandsMarshallerImplTest {
         );
     }
 
-    private final OptimizedMarshaller standardMarshaller = new 
OptimizedMarshaller(registry);
+    private final OptimizedMarshaller standardMarshaller = new 
OptimizedMarshaller(registry, NO_POOL);
 
-    private final PartitionCommandsMarshallerImpl partitionCommandsMarshaller 
= new PartitionCommandsMarshallerImpl(registry);
+    private final PartitionCommandsMarshallerImpl partitionCommandsMarshaller 
= new PartitionCommandsMarshallerImpl(registry, NO_POOL);
 
     @Test
     void marshalPrependsWithZeroForNotCatalogLevelAware() {

Reply via email to