This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2189b3ed799 IGNITE-26350 Enforce explicit use of OutputStream for
TcpDiscoverySpi#writeToSocket (#12306)
2189b3ed799 is described below
commit 2189b3ed79962a50f966cd2707bf6649caa8a184
Author: Maksim Timonin <[email protected]>
AuthorDate: Tue Sep 2 19:49:30 2025 +0500
IGNITE-26350 Enforce explicit use of OutputStream for
TcpDiscoverySpi#writeToSocket (#12306)
---
.../ignite/spi/discovery/tcp/ClientImpl.java | 9 +++++++--
.../ignite/spi/discovery/tcp/ServerImpl.java | 10 +++++-----
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 14 -------------
.../ignite/internal/IgniteClientRejoinTest.java | 9 ---------
.../IgniteDiscoveryMassiveNodeFailTest.java | 11 -----------
.../cache/distributed/CacheStartOnJoinTest.java | 13 ------------
.../dht/IgniteCacheTopologySplitAbstractTest.java | 11 -----------
.../IgniteTcpCommunicationConnectOnInitTest.java | 11 -----------
.../TcpCommunicationSpiSkipMessageSendTest.java | 5 +++--
...cpClientDiscoverySpiFailureTimeoutSelfTest.java | 23 ----------------------
.../tcp/TcpDiscoveryCoordinatorFailureTest.java | 15 --------------
.../discovery/tcp/TcpDiscoveryFailedJoinTest.java | 7 -------
.../TcpDiscoveryPendingMessageDeliveryTest.java | 7 -------
13 files changed, 15 insertions(+), 130 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e228345ba4f..643d4d3fb25 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -21,6 +21,7 @@ import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -718,11 +719,13 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean openSock = false;
Socket sock = null;
+ OutputStream out;
try {
long tsNanos = System.nanoTime();
sock = spi.openSocket(addr, timeoutHelper);
+ out = spi.socketStream(sock);
openSock = true;
@@ -730,7 +733,7 @@ class ClientImpl extends TcpDiscoveryImpl {
req.client(true);
- spi.writeToSocket(sock, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, out, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null, ackTimeout0);
@@ -785,7 +788,7 @@ class ClientImpl extends TcpDiscoveryImpl {
if (msg instanceof TraceableMessage)
tracing.messages().beforeSend((TraceableMessage)msg);
- spi.writeToSocket(sock, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, out, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
spi.stats.onMessageSent(msg, U.millisSinceNanos(tsNanos));
@@ -1386,6 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
try {
spi.writeToSocket(
sock,
+ spi.socketStream(sock),
msg,
sockTimeout);
}
@@ -1432,6 +1436,7 @@ class ClientImpl extends TcpDiscoveryImpl {
spi.writeToSocket(
sock,
+ spi.socketStream(sock),
msg,
sockTimeout);
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 003701c9add..e469d8572f3 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -877,7 +877,7 @@ class ServerImpl extends TcpDiscoveryImpl {
openedSock = true;
- spi.writeToSocket(sock, new
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
+ spi.writeToSocket(sock, spi.socketStream(sock), new
TcpDiscoveryPingRequest(locNodeId, clientNodeId),
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryPingResponse res = spi.readMessage(sock,
null, timeoutHelper.nextTimeoutChunk(
@@ -1475,7 +1475,7 @@ class ServerImpl extends TcpDiscoveryImpl {
TcpDiscoveryHandshakeRequest req = new
TcpDiscoveryHandshakeRequest(locNodeId);
// Handshake.
- spi.writeToSocket(sock, req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, spi.socketStream(sock), req,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
TcpDiscoveryHandshakeResponse res = spi.readMessage(sock,
null, timeoutHelper.nextTimeoutChunk(
ackTimeout0));
@@ -1510,7 +1510,7 @@ class ServerImpl extends TcpDiscoveryImpl {
// Send message.
tsNanos = System.nanoTime();
- spi.writeToSocket(sock, msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, spi.socketStream(sock), msg,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
long tsNanos0 = System.nanoTime();
@@ -6769,7 +6769,7 @@ class ServerImpl extends TcpDiscoveryImpl {
res.clientExists(clientWorker.ping(timeoutHelper));
}
- spi.writeToSocket(sock, res,
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+ spi.writeToSocket(sock, spi.socketStream(sock),
res, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
if (!(sock instanceof SSLSocket))
sock.shutdownOutput();
@@ -6862,7 +6862,7 @@ class ServerImpl extends TcpDiscoveryImpl {
spi.getEffectiveSocketTimeout(srvSock) + " to " +
rmtAddr + ":" + sock.getPort());
}
- spi.writeToSocket(sock, res,
spi.getEffectiveSocketTimeout(srvSock));
+ spi.writeToSocket(sock, spi.socketStream(sock), res,
spi.getEffectiveSocketTimeout(srvSock));
// It can happen if a remote node is stopped and it has a
loopback address in the list of addresses,
// the local node sends a handshake request message on the
loopback address, so we get here.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index a3e4d2ae2b7..e017f4c0ff7 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1726,20 +1726,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
sock.setKeepAlive(true);
}
- /**
- * Writes message to the socket.
- *
- * @param sock Socket.
- * @param msg Message.
- * @param timeout Socket write timeout.
- * @throws IOException If IO failed or write timed out.
- * @throws IgniteCheckedException If marshalling failed.
- */
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException,
- IgniteCheckedException {
- writeToSocket(sock, socketStream(sock), msg, timeout);
- }
-
/**
* @param msg Message.
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
index dc66277642d..285a70efc74 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientRejoinTest.java
@@ -362,15 +362,6 @@ public class IgniteClientRejoinTest extends
GridCommonAbstractTest {
super.writeToSocket(sock, msg, data, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
- long timeout) throws IOException, IgniteCheckedException {
- if (blockAll || block && sock.getPort() == 47500)
- throw new SocketException("Test discovery exception");
-
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
index 18fa278c8c4..304991e46df 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteDiscoveryMassiveNodeFailTest.java
@@ -312,17 +312,6 @@ public class IgniteDiscoveryMassiveNodeFailTest extends
GridCommonAbstractTest {
super.writeToSocket(sock, msg, data, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
- long timeout) throws IOException, IgniteCheckedException {
- assertNotFailedNode(sock);
-
- if (isDrop(msg))
- return;
-
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
index 4fbe4e354e3..d7c45f5e16f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -17,16 +17,12 @@
package org.apache.ignite.internal.processors.cache.distributed;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -62,15 +58,6 @@ public class CacheStartOnJoinTest extends
GridCommonAbstractTest {
/** */
private boolean delay = true;
- @Override protected void writeToSocket(
- Socket sock,
- OutputStream out,
- TcpDiscoveryAbstractMessage msg,
- long timeout
- ) throws IOException, IgniteCheckedException {
- super.writeToSocket(sock, out, msg, timeout);
- }
-
@Override protected void
startMessageProcess(TcpDiscoveryAbstractMessage msg) {
if (getTestIgniteInstanceName(0).equals(ignite.name())) {
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
index 4422d1e3702..2c055f2dd99 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTopologySplitAbstractTest.java
@@ -237,17 +237,6 @@ public abstract class IgniteCacheTopologySplitAbstractTest
extends GridCommonAbs
super.writeToSocket(sock, out, msg, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- TcpDiscoveryAbstractMessage msg,
- long timeout
- ) throws IOException, IgniteCheckedException {
- checkSegmented((InetSocketAddress)sock.getRemoteSocketAddress(),
timeout);
-
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(TcpDiscoveryAbstractMessage
msg, Socket sock, int res,
long timeout) throws IOException {
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
index 2e03775bbf7..8bce7675f88 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationConnectOnInitTest.java
@@ -207,17 +207,6 @@ public class IgniteTcpCommunicationConnectOnInitTest
extends GridCommonAbstractT
super.writeToSocket(sock, msg, data, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- TcpDiscoveryAbstractMessage msg,
- long timeout
- ) throws IOException, IgniteCheckedException {
- awaitLatch();
-
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(
Socket sock,
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
index 2ac5abf3c6f..0e26ff95b76 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spi.communication.tcp;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Set;
@@ -292,7 +293,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends
GridCommonAbstractTe
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
+ @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
if (netDisabled) {
netDisabledLatch.countDown();
@@ -300,7 +301,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends
GridCommonAbstractTe
throw new SocketTimeoutException("CustomDiscoverySpi: network
is disabled.");
}
else
- super.writeToSocket(sock, msg, timeout);
+ super.writeToSocket(sock, out, msg, timeout);
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 14baed33753..2708b606007 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -496,29 +496,6 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest
extends TcpClientDiscov
throw new SocketTimeoutException("Write to socket delay
timeout exception.");
}
- /** */
- @Override protected void writeToSocket(
- Socket sock,
- TcpDiscoveryAbstractMessage msg,
- long timeout
- ) throws IOException, IgniteCheckedException {
- if (writeToSocketDelay > 0) {
- try {
- U.dumpStack(log, "Before sleep [msg=" + msg + ']');
-
- Thread.sleep(writeToSocketDelay);
- }
- catch (InterruptedException ignore) {
- // Nothing to do.
- }
- }
-
- if (sock.getSoTimeout() >= writeToSocketDelay)
- super.writeToSocket(sock, msg, timeout);
- else
- throw new SocketTimeoutException("Write to socket delay
timeout exception.");
- }
-
/** */
@Override protected void writeToSocket(
TcpDiscoveryAbstractMessage msg,
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
index 4a63a0d1b68..10c6aa14242 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryCoordinatorFailureTest.java
@@ -301,21 +301,6 @@ public class TcpDiscoveryCoordinatorFailureTest extends
GridCommonAbstractTest {
super.writeToSocket(sock, msg, data, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(
- Socket sock,
- TcpDiscoveryAbstractMessage msg,
- long timeout
- ) throws IOException, IgniteCheckedException {
- if (isDrop(msg)) {
- // Replace logic routine message with a stub to update
last-sent-time to avoid segmentation on
- // connRecoveryTimeout.
- msg = new TcpDiscoveryConnectionCheckMessage(locNode);
- }
-
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(
Socket sock,
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
index 8050e781383..481eb24a0fb 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryFailedJoinTest.java
@@ -198,13 +198,6 @@ public class TcpDiscoveryFailedJoinTest extends
GridCommonAbstractTest {
super.writeToSocket(sock, msg, data, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
- long timeout) throws IOException, IgniteCheckedException {
- if (sock.getPort() != FAIL_PORT)
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
index b8df70b1a92..ddb52738db5 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryTest.java
@@ -264,13 +264,6 @@ public class TcpDiscoveryPendingMessageDeliveryTest
extends GridCommonAbstractTe
super.writeToSocket(sock, msg, data, timeout);
}
- /** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock,
TcpDiscoveryAbstractMessage msg,
- long timeout) throws IOException, IgniteCheckedException {
- if (!blockMsgs)
- super.writeToSocket(sock, msg, timeout);
- }
-
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {