This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2189b3ed799 IGNITE-26350 Enforce explicit use of OutputStream for 
TcpDiscoverySpi#writeToSocket (#12306)
2189b3ed799 is described below

commit 2189b3ed79962a50f966cd2707bf6649caa8a184
Author: Maksim Timonin <[email protected]>
AuthorDate: Tue Sep 2 19:49:30 2025 +0500

    IGNITE-26350 Enforce explicit use of OutputStream for 
TcpDiscoverySpi#writeToSocket (#12306)
---
 .../ignite/spi/discovery/tcp/ClientImpl.java       |  9 +++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 10 +++++-----
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  | 14 -------------
 .../ignite/internal/IgniteClientRejoinTest.java    |  9 ---------
 .../IgniteDiscoveryMassiveNodeFailTest.java        | 11 -----------
 .../cache/distributed/CacheStartOnJoinTest.java    | 13 ------------
 .../dht/IgniteCacheTopologySplitAbstractTest.java  | 11 -----------
 .../IgniteTcpCommunicationConnectOnInitTest.java   | 11 -----------
 .../TcpCommunicationSpiSkipMessageSendTest.java    |  5 +++--
 ...cpClientDiscoverySpiFailureTimeoutSelfTest.java | 23 ----------------------
 .../tcp/TcpDiscoveryCoordinatorFailureTest.java    | 15 --------------
 .../discovery/tcp/TcpDiscoveryFailedJoinTest.java  |  7 -------
 .../TcpDiscoveryPendingMessageDeliveryTest.java    |  7 -------
 13 files changed, 15 insertions(+), 130 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e228345ba4f..643d4d3fb25 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -21,6 +21,7 @@ import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.io.StreamCorruptedException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -718,11 +719,13 @@ class ClientImpl extends TcpDiscoveryImpl {
             boolean openSock = false;
 
             Socket sock = null;
+            OutputStream out;
 
             try {
                 long tsNanos = System.nanoTime();
 
                 sock = spi.openSocket(addr, timeoutHelper);
+                out = spi.socketStream(sock);
 
                 openSock = true;
 
@@ -730,7 +733,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 req.client(true);
 
-                spi.writeToSocket(sock, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, out, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, 
null, ackTimeout0);
 
@@ -785,7 +788,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (msg instanceof TraceableMessage)
                     tracing.messages().beforeSend((TraceableMessage)msg);
 
-                spi.writeToSocket(sock, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, out, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
 
@@ -1386,6 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         try {
                             spi.writeToSocket(
                                 sock,
+                                spi.socketStream(sock),
                                 msg,
                                 sockTimeout);
                         }
@@ -1432,6 +1436,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     spi.writeToSocket(
                         sock,
+                        spi.socketStream(sock),
                         msg,
                         sockTimeout);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 003701c9add..e469d8572f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -877,7 +877,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         openedSock = true;
 
-                        spi.writeToSocket(sock, new 
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+                        spi.writeToSocket(sock, spi.socketStream(sock), new 
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
                             
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                         TcpDiscoveryPingResponse res = spi.readMessage(sock, 
null, timeoutHelper.nextTimeoutChunk(
@@ -1475,7 +1475,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryHandshakeRequest req = new 
TcpDiscoveryHandshakeRequest(locNodeId);
 
                 // Handshake.
-                spi.writeToSocket(sock, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, spi.socketStream(sock), req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, 
null, timeoutHelper.nextTimeoutChunk(
                     ackTimeout0));
@@ -1510,7 +1510,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Send message.
                 tsNanos = System.nanoTime();
 
-                spi.writeToSocket(sock, msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                spi.writeToSocket(sock, spi.socketStream(sock), msg, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 long tsNanos0 = System.nanoTime();
 
@@ -6769,7 +6769,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     
res.clientExists(clientWorker.ping(timeoutHelper));
                             }
 
-                            spi.writeToSocket(sock, res, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                            spi.writeToSocket(sock, spi.socketStream(sock), 
res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                             if (!(sock instanceof SSLSocket))
                                 sock.shutdownOutput();
@@ -6862,7 +6862,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             spi.getEffectiveSocketTimeout(srvSock) + " to " + 
rmtAddr + ":" + sock.getPort());
                     }
 
-                    spi.writeToSocket(sock, res, 
spi.getEffectiveSocketTimeout(srvSock));
+                    spi.writeToSocket(sock, spi.socketStream(sock), res, 
spi.getEffectiveSocketTimeout(srvSock));
 
                     // It can happen if a remote node is stopped and it has a 
loopback address in the list of addresses,
                     // the local node sends a handshake request message on the 
loopback address, so we get here.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index a3e4d2ae2b7..e017f4c0ff7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1726,20 +1726,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
         sock.setKeepAlive(true);
     }
 
-    /**
-     * Writes message to the socket.
-     *
-     * @param sock Socket.
-     * @param msg Message.
-     * @param timeout Socket write timeout.
-     * @throws IOException If IO failed or write timed out.
-     * @throws IgniteCheckedException If marshalling failed.
-     */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, 
long timeout) throws IOException,
-        IgniteCheckedException {
-        writeToSocket(sock, socketStream(sock), msg, timeout);
-    }
-
     /**
      * @param msg Message.
      */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
index dc66277642d..285a70efc74 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -362,15 +362,6 @@ public class IgniteClientRejoinTest extends 
GridCommonAbstractTest {
             super.writeToSocket(sock, msg, data, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            long timeout) throws IOException, IgniteCheckedException {
-            if (blockAll || block && sock.getPort() == 47500)
-                throw new SocketException("Test discovery exception");
-
-            super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
index 18fa278c8c4..304991e46df 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
@@ -312,17 +312,6 @@ public class IgniteDiscoveryMassiveNodeFailTest extends 
GridCommonAbstractTest {
             super.writeToSocket(sock, msg, data, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            long timeout) throws IOException, IgniteCheckedException {
-            assertNotFailedNode(sock);
-
-            if (isDrop(msg))
-                return;
-
-            super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
index 4fbe4e354e3..d7c45f5e16f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -17,16 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CyclicBarrier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -62,15 +58,6 @@ public class CacheStartOnJoinTest extends 
GridCommonAbstractTest {
             /** */
             private boolean delay = true;
 
-            @Override protected void writeToSocket(
-                Socket sock,
-                OutputStream out,
-                TcpDiscoveryAbstractMessage msg,
-                long timeout
-            ) throws IOException, IgniteCheckedException {
-                super.writeToSocket(sock, out, msg, timeout);
-            }
-
             @Override protected void 
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
                 if (getTestIgniteInstanceName(0).equals(ignite.name())) {
                     if (msg instanceof TcpDiscoveryJoinRequestMessage) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
index 4422d1e3702..2c055f2dd99 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
@@ -237,17 +237,6 @@ public abstract class IgniteCacheTopologySplitAbstractTest 
extends GridCommonAbs
             super.writeToSocket(sock, out, msg, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            TcpDiscoveryAbstractMessage msg,
-            long timeout
-        ) throws IOException, IgniteCheckedException {
-            checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(), 
timeout);
-
-            super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(TcpDiscoveryAbstractMessage 
msg, Socket sock, int res,
             long timeout) throws IOException {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
index 2e03775bbf7..8bce7675f88 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
@@ -207,17 +207,6 @@ public class IgniteTcpCommunicationConnectOnInitTest 
extends GridCommonAbstractT
             super.writeToSocket(sock, msg, data, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            TcpDiscoveryAbstractMessage msg,
-            long timeout
-        ) throws IOException, IgniteCheckedException {
-            awaitLatch();
-
-            super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(
             Socket sock,
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
index 2ac5abf3c6f..0e26ff95b76 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Set;
@@ -292,7 +293,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends 
GridCommonAbstractTe
         }
 
         /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+        @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
             if (netDisabled) {
                 netDisabledLatch.countDown();
@@ -300,7 +301,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends 
GridCommonAbstractTe
                 throw new SocketTimeoutException("CustomDiscoverySpi: network 
is disabled.");
             }
             else
-                super.writeToSocket(sock, msg, timeout);
+                super.writeToSocket(sock, out, msg, timeout);
         }
 
         /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 14baed33753..2708b606007 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -496,29 +496,6 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest 
extends TcpClientDiscov
                 throw new SocketTimeoutException("Write to socket delay 
timeout exception.");
         }
 
-        /**  */
-        @Override protected void writeToSocket(
-            Socket sock,
-            TcpDiscoveryAbstractMessage msg,
-            long timeout
-        ) throws IOException, IgniteCheckedException {
-            if (writeToSocketDelay > 0) {
-                try {
-                    U.dumpStack(log, "Before sleep [msg=" + msg + ']');
-
-                    Thread.sleep(writeToSocketDelay);
-                }
-                catch (InterruptedException ignore) {
-                    // Nothing to do.
-                }
-            }
-
-            if (sock.getSoTimeout() >= writeToSocketDelay)
-                super.writeToSocket(sock, msg, timeout);
-            else
-                throw new SocketTimeoutException("Write to socket delay 
timeout exception.");
-        }
-
         /**  */
         @Override protected void writeToSocket(
             TcpDiscoveryAbstractMessage msg,
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
index 4a63a0d1b68..10c6aa14242 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
@@ -301,21 +301,6 @@ public class TcpDiscoveryCoordinatorFailureTest extends 
GridCommonAbstractTest {
             super.writeToSocket(sock, msg, data, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(
-            Socket sock,
-            TcpDiscoveryAbstractMessage msg,
-            long timeout
-        ) throws IOException, IgniteCheckedException {
-            if (isDrop(msg)) {
-                // Replace logic routine message with a stub to update 
last-sent-time to avoid segmentation on
-                // connRecoveryTimeout.
-                msg = new TcpDiscoveryConnectionCheckMessage(locNode);
-            }
-
-            super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(
             Socket sock,
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
index 8050e781383..481eb24a0fb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
@@ -198,13 +198,6 @@ public class TcpDiscoveryFailedJoinTest extends 
GridCommonAbstractTest {
                 super.writeToSocket(sock, msg, data, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            long timeout) throws IOException, IgniteCheckedException {
-            if (sock.getPort() != FAIL_PORT)
-                super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index b8df70b1a92..ddb52738db5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -264,13 +264,6 @@ public class TcpDiscoveryPendingMessageDeliveryTest 
extends GridCommonAbstractTe
                 super.writeToSocket(sock, msg, data, timeout);
         }
 
-        /** {@inheritDoc} */
-        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
-            long timeout) throws IOException, IgniteCheckedException {
-            if (!blockMsgs)
-                super.writeToSocket(sock, msg, timeout);
-        }
-
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {

Reply via email to