oleg-vlsk commented on code in PR #12824:
URL: https://github.com/apache/ignite/pull/12824#discussion_r2870803956


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java:
##########
@@ -327,7 +327,7 @@ private Map<PartitionKey, PartitionHashRecord> 
checkSnapshotFiles(
             closeAllComponents(snpCtx);
         }
 
-        return res;
+        return Collections.unmodifiableMap(res);

Review Comment:
   Do we need to test the usage of unmodifiable collection here?



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** Test check that serialization raised only once. */
+    @Test
+    public void testSingleSerializedOnce() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+        startClientGrid(NODES_CNT);
+
+        TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+        assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+        clnCommSpi.blockMessages((node, msg) -> msg instanceof 
SingleNodeMessage);
+
+        TestDiscoverySpi discoSpi = 
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        discoSpi.messageLatch(latch);
+
+        Set<UUID> nodeIdsRes = new HashSet<>();
+
+        List<DistributedProcess<byte[], byte[]>> processes = new 
ArrayList<>(NODES_CNT + 1);
+
+        for (int i = 0; i < NODES_CNT; i++)
+            nodeIdsRes.add(grid(i).localNode().id());
+
+        for (int n = 0; n < NODES_CNT + 1; n++) {
+            DistributedProcess<byte[], byte[]> dp = new TestDistributedProcess(
+                grid(n).context(), (id, req) -> new InitMessage<>(id, 
TEST_PROCESS, req, true));
+
+            processes.add(dp);
+        }
+
+        int sendBuffSize = clnCommSpi.getSocketSendBuffer();
+
+        // it will be enough for buffer overflow cause some serialization 
overhead is present
+        byte[] arr = new byte[sendBuffSize];
+
+        byte[] serialized = U.toBytes(arr);
+
+        assertTrue(serialized.length > sendBuffSize);
+
+        processes.get(0).start(UUID.randomUUID(), arr);
+
+        clnCommSpi.waitForBlocked();
+
+        assertEquals(1, clnCommSpi.blockedMessages().size());
+
+        TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked = 
clnCommSpi.blockedMessages().get(0);
+
+        SingleNodeMessage msgSpied = 
(SingleNodeMessage)spy(blocked.ioMessage().message());
+
+        setFieldValue(blocked.ioMessage(), "msg", msgSpied);
+
+        clnCommSpi.stopBlock();
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        // Serialized only once.
+        verify(msgSpied, times(1)).toBytes(any());
+
+        // Write to buffer - several times cause buffer size is less than 
serialization representation.
+        verify(msgSpied, times(2)).writeTo(any(), any());
+    }
+
+    /** */
+    private static class TestDistributedProcess extends 
DistributedProcess<byte[], byte[]> {
+        /** */
+        public TestDistributedProcess(
+            GridKernalContext ctx,
+            BiFunction<UUID, byte[], ? extends InitMessage<byte[]>> 
initMsgFactory
+        ) {
+            super(
+                ctx,
+                TEST_PROCESS,
+                (req) -> new GridFinishedFuture<>(req),
+                (uuid, res, err) -> {},
+                initMsgFactory);
+        }
+    }
+
+    /** */
+    private static class TestDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private CountDownLatch messageLatch;

Review Comment:
   'Msg' should be used according to Ignite abbreviation standards.



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** Test check that serialization raised only once. */
+    @Test
+    public void testSingleSerializedOnce() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+        startClientGrid(NODES_CNT);
+
+        TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+        assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+        clnCommSpi.blockMessages((node, msg) -> msg instanceof 
SingleNodeMessage);
+
+        TestDiscoverySpi discoSpi = 
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        discoSpi.messageLatch(latch);
+
+        Set<UUID> nodeIdsRes = new HashSet<>();

Review Comment:
   As an option to avoid the for-loop below:
   
   `Set<UUID> nodeIdsRes = 
grid(0).cluster().forServers().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet());`



##########
modules/core/src/main/java/org/apache/ignite/internal/util/distributed/SingleNodeMessage.java:
##########
@@ -92,8 +95,14 @@ public SingleNodeMessage(UUID processId, 
DistributedProcessType type, R resp, Th
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeByteArray(U.toBytes(resp)))
+                if (respBytes == null) {

Review Comment:
   Unnecessary '{ }' according to Ignite codestyle.



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();

Review Comment:
   Don'y we need `super.afterTest()` here? 



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** Test check that serialization raised only once. */
+    @Test
+    public void testSingleSerializedOnce() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+        startClientGrid(NODES_CNT);
+
+        TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+        assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+        clnCommSpi.blockMessages((node, msg) -> msg instanceof 
SingleNodeMessage);
+
+        TestDiscoverySpi discoSpi = 
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        discoSpi.messageLatch(latch);
+
+        Set<UUID> nodeIdsRes = new HashSet<>();
+
+        List<DistributedProcess<byte[], byte[]>> processes = new 
ArrayList<>(NODES_CNT + 1);
+
+        for (int i = 0; i < NODES_CNT; i++)
+            nodeIdsRes.add(grid(i).localNode().id());
+
+        for (int n = 0; n < NODES_CNT + 1; n++) {
+            DistributedProcess<byte[], byte[]> dp = new TestDistributedProcess(
+                grid(n).context(), (id, req) -> new InitMessage<>(id, 
TEST_PROCESS, req, true));
+
+            processes.add(dp);
+        }
+
+        int sendBuffSize = clnCommSpi.getSocketSendBuffer();
+
+        // it will be enough for buffer overflow cause some serialization 
overhead is present
+        byte[] arr = new byte[sendBuffSize];
+
+        byte[] serialized = U.toBytes(arr);
+
+        assertTrue(serialized.length > sendBuffSize);
+
+        processes.get(0).start(UUID.randomUUID(), arr);
+
+        clnCommSpi.waitForBlocked();
+
+        assertEquals(1, clnCommSpi.blockedMessages().size());
+
+        TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked = 
clnCommSpi.blockedMessages().get(0);
+
+        SingleNodeMessage msgSpied = 
(SingleNodeMessage)spy(blocked.ioMessage().message());
+
+        setFieldValue(blocked.ioMessage(), "msg", msgSpied);
+
+        clnCommSpi.stopBlock();
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        // Serialized only once.
+        verify(msgSpied, times(1)).toBytes(any());
+
+        // Write to buffer - several times cause buffer size is less than 
serialization representation.
+        verify(msgSpied, times(2)).writeTo(any(), any());
+    }
+
+    /** */
+    private static class TestDistributedProcess extends 
DistributedProcess<byte[], byte[]> {
+        /** */
+        public TestDistributedProcess(
+            GridKernalContext ctx,
+            BiFunction<UUID, byte[], ? extends InitMessage<byte[]>> 
initMsgFactory
+        ) {
+            super(
+                ctx,
+                TEST_PROCESS,
+                (req) -> new GridFinishedFuture<>(req),
+                (uuid, res, err) -> {},
+                initMsgFactory);
+        }
+    }
+
+    /** */
+    private static class TestDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private CountDownLatch messageLatch;
+
+        /** Message raized trigger. */
+        void messageLatch(CountDownLatch messageLatch) {

Review Comment:
   'Msg' should be used according to Ignite abbreviation standards.



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** Test check that serialization raised only once. */
+    @Test
+    public void testSingleSerializedOnce() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+        startClientGrid(NODES_CNT);

Review Comment:
   We use `grid(NODES_CNT)` 3 times in the test, maybe it's worth introducing a 
local variable like this:
   
   `IgniteEx client = startClientGrid(NODES_CNT);`



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** Test check that serialization raised only once. */
+    @Test
+    public void testSingleSerializedOnce() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+        startClientGrid(NODES_CNT);
+
+        TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+        assertTrue(grid(NODES_CNT).configuration().isClientMode());

Review Comment:
   I would move this line before TestRecordingCommunicationSpi initialization 
for better readability, unless there's a certain reason why this assertion is 
done after it?



##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest 
{
+    /** Nodes count. */
+    public static final int NODES_CNT = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi()
+            
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+        return cfg;
+    }
+
+    /** Test check that serialization raised only once. */
+    @Test
+    public void testSingleSerializedOnce() throws Exception {
+        startGridsMultiThreaded(NODES_CNT);
+        startClientGrid(NODES_CNT);
+
+        TestRecordingCommunicationSpi clnCommSpi = 
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+        assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+        clnCommSpi.blockMessages((node, msg) -> msg instanceof 
SingleNodeMessage);
+
+        TestDiscoverySpi discoSpi = 
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        discoSpi.messageLatch(latch);
+
+        Set<UUID> nodeIdsRes = new HashSet<>();
+
+        List<DistributedProcess<byte[], byte[]>> processes = new 
ArrayList<>(NODES_CNT + 1);
+
+        for (int i = 0; i < NODES_CNT; i++)
+            nodeIdsRes.add(grid(i).localNode().id());
+
+        for (int n = 0; n < NODES_CNT + 1; n++) {
+            DistributedProcess<byte[], byte[]> dp = new TestDistributedProcess(
+                grid(n).context(), (id, req) -> new InitMessage<>(id, 
TEST_PROCESS, req, true));
+
+            processes.add(dp);
+        }
+
+        int sendBuffSize = clnCommSpi.getSocketSendBuffer();
+
+        // it will be enough for buffer overflow cause some serialization 
overhead is present
+        byte[] arr = new byte[sendBuffSize];
+
+        byte[] serialized = U.toBytes(arr);
+
+        assertTrue(serialized.length > sendBuffSize);
+
+        processes.get(0).start(UUID.randomUUID(), arr);
+
+        clnCommSpi.waitForBlocked();
+
+        assertEquals(1, clnCommSpi.blockedMessages().size());
+
+        TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked = 
clnCommSpi.blockedMessages().get(0);
+
+        SingleNodeMessage msgSpied = 
(SingleNodeMessage)spy(blocked.ioMessage().message());
+
+        setFieldValue(blocked.ioMessage(), "msg", msgSpied);
+
+        clnCommSpi.stopBlock();
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        // Serialized only once.
+        verify(msgSpied, times(1)).toBytes(any());
+
+        // Write to buffer - several times cause buffer size is less than 
serialization representation.
+        verify(msgSpied, times(2)).writeTo(any(), any());
+    }
+
+    /** */
+    private static class TestDistributedProcess extends 
DistributedProcess<byte[], byte[]> {
+        /** */
+        public TestDistributedProcess(
+            GridKernalContext ctx,
+            BiFunction<UUID, byte[], ? extends InitMessage<byte[]>> 
initMsgFactory
+        ) {
+            super(
+                ctx,
+                TEST_PROCESS,
+                (req) -> new GridFinishedFuture<>(req),

Review Comment:
   Method reference may be used here instead of lambda.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to