oleg-vlsk commented on code in PR #12824:
URL: https://github.com/apache/ignite/pull/12824#discussion_r2870803956
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java:
##########
@@ -327,7 +327,7 @@ private Map<PartitionKey, PartitionHashRecord>
checkSnapshotFiles(
closeAllComponents(snpCtx);
}
- return res;
+ return Collections.unmodifiableMap(res);
Review Comment:
Do we need to test the usage of unmodifiable collection here?
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDiscoverySpi(new TestDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** Test check that serialization raised only once. */
+ @Test
+ public void testSingleSerializedOnce() throws Exception {
+ startGridsMultiThreaded(NODES_CNT);
+ startClientGrid(NODES_CNT);
+
+ TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+ assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+ clnCommSpi.blockMessages((node, msg) -> msg instanceof
SingleNodeMessage);
+
+ TestDiscoverySpi discoSpi =
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ discoSpi.messageLatch(latch);
+
+ Set<UUID> nodeIdsRes = new HashSet<>();
+
+ List<DistributedProcess<byte[], byte[]>> processes = new
ArrayList<>(NODES_CNT + 1);
+
+ for (int i = 0; i < NODES_CNT; i++)
+ nodeIdsRes.add(grid(i).localNode().id());
+
+ for (int n = 0; n < NODES_CNT + 1; n++) {
+ DistributedProcess<byte[], byte[]> dp = new TestDistributedProcess(
+ grid(n).context(), (id, req) -> new InitMessage<>(id,
TEST_PROCESS, req, true));
+
+ processes.add(dp);
+ }
+
+ int sendBuffSize = clnCommSpi.getSocketSendBuffer();
+
+ // it will be enough for buffer overflow cause some serialization
overhead is present
+ byte[] arr = new byte[sendBuffSize];
+
+ byte[] serialized = U.toBytes(arr);
+
+ assertTrue(serialized.length > sendBuffSize);
+
+ processes.get(0).start(UUID.randomUUID(), arr);
+
+ clnCommSpi.waitForBlocked();
+
+ assertEquals(1, clnCommSpi.blockedMessages().size());
+
+ TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked =
clnCommSpi.blockedMessages().get(0);
+
+ SingleNodeMessage msgSpied =
(SingleNodeMessage)spy(blocked.ioMessage().message());
+
+ setFieldValue(blocked.ioMessage(), "msg", msgSpied);
+
+ clnCommSpi.stopBlock();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Serialized only once.
+ verify(msgSpied, times(1)).toBytes(any());
+
+ // Write to buffer - several times cause buffer size is less than
serialization representation.
+ verify(msgSpied, times(2)).writeTo(any(), any());
+ }
+
+ /** */
+ private static class TestDistributedProcess extends
DistributedProcess<byte[], byte[]> {
+ /** */
+ public TestDistributedProcess(
+ GridKernalContext ctx,
+ BiFunction<UUID, byte[], ? extends InitMessage<byte[]>>
initMsgFactory
+ ) {
+ super(
+ ctx,
+ TEST_PROCESS,
+ (req) -> new GridFinishedFuture<>(req),
+ (uuid, res, err) -> {},
+ initMsgFactory);
+ }
+ }
+
+ /** */
+ private static class TestDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private CountDownLatch messageLatch;
Review Comment:
'Msg' should be used according to Ignite abbreviation standards.
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDiscoverySpi(new TestDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** Test check that serialization raised only once. */
+ @Test
+ public void testSingleSerializedOnce() throws Exception {
+ startGridsMultiThreaded(NODES_CNT);
+ startClientGrid(NODES_CNT);
+
+ TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+ assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+ clnCommSpi.blockMessages((node, msg) -> msg instanceof
SingleNodeMessage);
+
+ TestDiscoverySpi discoSpi =
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ discoSpi.messageLatch(latch);
+
+ Set<UUID> nodeIdsRes = new HashSet<>();
Review Comment:
As an option to avoid the for-loop below:
`Set<UUID> nodeIdsRes =
grid(0).cluster().forServers().nodes().stream().map(ClusterNode::id).collect(Collectors.toSet());`
##########
modules/core/src/main/java/org/apache/ignite/internal/util/distributed/SingleNodeMessage.java:
##########
@@ -92,8 +95,14 @@ public SingleNodeMessage(UUID processId,
DistributedProcessType type, R resp, Th
writer.incrementState();
case 2:
- if (!writer.writeByteArray(U.toBytes(resp)))
+ if (respBytes == null) {
Review Comment:
Unnecessary '{ }' according to Ignite codestyle.
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
Review Comment:
Don'y we need `super.afterTest()` here?
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDiscoverySpi(new TestDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** Test check that serialization raised only once. */
+ @Test
+ public void testSingleSerializedOnce() throws Exception {
+ startGridsMultiThreaded(NODES_CNT);
+ startClientGrid(NODES_CNT);
+
+ TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+ assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+ clnCommSpi.blockMessages((node, msg) -> msg instanceof
SingleNodeMessage);
+
+ TestDiscoverySpi discoSpi =
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ discoSpi.messageLatch(latch);
+
+ Set<UUID> nodeIdsRes = new HashSet<>();
+
+ List<DistributedProcess<byte[], byte[]>> processes = new
ArrayList<>(NODES_CNT + 1);
+
+ for (int i = 0; i < NODES_CNT; i++)
+ nodeIdsRes.add(grid(i).localNode().id());
+
+ for (int n = 0; n < NODES_CNT + 1; n++) {
+ DistributedProcess<byte[], byte[]> dp = new TestDistributedProcess(
+ grid(n).context(), (id, req) -> new InitMessage<>(id,
TEST_PROCESS, req, true));
+
+ processes.add(dp);
+ }
+
+ int sendBuffSize = clnCommSpi.getSocketSendBuffer();
+
+ // it will be enough for buffer overflow cause some serialization
overhead is present
+ byte[] arr = new byte[sendBuffSize];
+
+ byte[] serialized = U.toBytes(arr);
+
+ assertTrue(serialized.length > sendBuffSize);
+
+ processes.get(0).start(UUID.randomUUID(), arr);
+
+ clnCommSpi.waitForBlocked();
+
+ assertEquals(1, clnCommSpi.blockedMessages().size());
+
+ TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked =
clnCommSpi.blockedMessages().get(0);
+
+ SingleNodeMessage msgSpied =
(SingleNodeMessage)spy(blocked.ioMessage().message());
+
+ setFieldValue(blocked.ioMessage(), "msg", msgSpied);
+
+ clnCommSpi.stopBlock();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Serialized only once.
+ verify(msgSpied, times(1)).toBytes(any());
+
+ // Write to buffer - several times cause buffer size is less than
serialization representation.
+ verify(msgSpied, times(2)).writeTo(any(), any());
+ }
+
+ /** */
+ private static class TestDistributedProcess extends
DistributedProcess<byte[], byte[]> {
+ /** */
+ public TestDistributedProcess(
+ GridKernalContext ctx,
+ BiFunction<UUID, byte[], ? extends InitMessage<byte[]>>
initMsgFactory
+ ) {
+ super(
+ ctx,
+ TEST_PROCESS,
+ (req) -> new GridFinishedFuture<>(req),
+ (uuid, res, err) -> {},
+ initMsgFactory);
+ }
+ }
+
+ /** */
+ private static class TestDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private CountDownLatch messageLatch;
+
+ /** Message raized trigger. */
+ void messageLatch(CountDownLatch messageLatch) {
Review Comment:
'Msg' should be used according to Ignite abbreviation standards.
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDiscoverySpi(new TestDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** Test check that serialization raised only once. */
+ @Test
+ public void testSingleSerializedOnce() throws Exception {
+ startGridsMultiThreaded(NODES_CNT);
+ startClientGrid(NODES_CNT);
Review Comment:
We use `grid(NODES_CNT)` 3 times in the test, maybe it's worth introducing a
local variable like this:
`IgniteEx client = startClientGrid(NODES_CNT);`
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDiscoverySpi(new TestDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** Test check that serialization raised only once. */
+ @Test
+ public void testSingleSerializedOnce() throws Exception {
+ startGridsMultiThreaded(NODES_CNT);
+ startClientGrid(NODES_CNT);
+
+ TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+ assertTrue(grid(NODES_CNT).configuration().isClientMode());
Review Comment:
I would move this line before TestRecordingCommunicationSpi initialization
for better readability, unless there's a certain reason why this assertion is
done after it?
##########
modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/** Check {@link SingleNodeMessage} serialization. */
+public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest
{
+ /** Nodes count. */
+ public static final int NODES_CNT = 2;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setDiscoverySpi(new TestDiscoverySpi()
+
.setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder()));
+
+ return cfg;
+ }
+
+ /** Test check that serialization raised only once. */
+ @Test
+ public void testSingleSerializedOnce() throws Exception {
+ startGridsMultiThreaded(NODES_CNT);
+ startClientGrid(NODES_CNT);
+
+ TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
+
+ assertTrue(grid(NODES_CNT).configuration().isClientMode());
+
+ clnCommSpi.blockMessages((node, msg) -> msg instanceof
SingleNodeMessage);
+
+ TestDiscoverySpi discoSpi =
(TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi();
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ discoSpi.messageLatch(latch);
+
+ Set<UUID> nodeIdsRes = new HashSet<>();
+
+ List<DistributedProcess<byte[], byte[]>> processes = new
ArrayList<>(NODES_CNT + 1);
+
+ for (int i = 0; i < NODES_CNT; i++)
+ nodeIdsRes.add(grid(i).localNode().id());
+
+ for (int n = 0; n < NODES_CNT + 1; n++) {
+ DistributedProcess<byte[], byte[]> dp = new TestDistributedProcess(
+ grid(n).context(), (id, req) -> new InitMessage<>(id,
TEST_PROCESS, req, true));
+
+ processes.add(dp);
+ }
+
+ int sendBuffSize = clnCommSpi.getSocketSendBuffer();
+
+ // it will be enough for buffer overflow cause some serialization
overhead is present
+ byte[] arr = new byte[sendBuffSize];
+
+ byte[] serialized = U.toBytes(arr);
+
+ assertTrue(serialized.length > sendBuffSize);
+
+ processes.get(0).start(UUID.randomUUID(), arr);
+
+ clnCommSpi.waitForBlocked();
+
+ assertEquals(1, clnCommSpi.blockedMessages().size());
+
+ TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked =
clnCommSpi.blockedMessages().get(0);
+
+ SingleNodeMessage msgSpied =
(SingleNodeMessage)spy(blocked.ioMessage().message());
+
+ setFieldValue(blocked.ioMessage(), "msg", msgSpied);
+
+ clnCommSpi.stopBlock();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Serialized only once.
+ verify(msgSpied, times(1)).toBytes(any());
+
+ // Write to buffer - several times cause buffer size is less than
serialization representation.
+ verify(msgSpied, times(2)).writeTo(any(), any());
+ }
+
+ /** */
+ private static class TestDistributedProcess extends
DistributedProcess<byte[], byte[]> {
+ /** */
+ public TestDistributedProcess(
+ GridKernalContext ctx,
+ BiFunction<UUID, byte[], ? extends InitMessage<byte[]>>
initMsgFactory
+ ) {
+ super(
+ ctx,
+ TEST_PROCESS,
+ (req) -> new GridFinishedFuture<>(req),
Review Comment:
Method reference may be used here instead of lambda.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]