This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 29ea216d43f IGNITE-28011 Fixed flaky
GridIoManagerFileTransmissionSelfTest (#12828)
29ea216d43f is described below
commit 29ea216d43fc463c83ec26d8b275a4d7c304f4ed
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sat Feb 28 23:46:11 2026 +0300
IGNITE-28011 Fixed flaky GridIoManagerFileTransmissionSelfTest (#12828)
---
.../GridIoManagerFileTransmissionSelfTest.java | 128 +++++++++++++--------
1 file changed, 78 insertions(+), 50 deletions(-)
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
index 4a120048afc..6d475231d49 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
@@ -58,6 +58,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
import
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -66,13 +67,13 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.After;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.ignite.internal.util.IgniteUtils.fileCount;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Test file transmission manager operations.
@@ -114,22 +115,18 @@ public class GridIoManagerFileTransmissionSelfTest
extends GridCommonAbstractTes
topic = GridTopic.TOPIC_CACHE.topic("test", 0);
}
- /**
- * @throws Exception if failed.
- */
- @Before
- public void before() throws Exception {
+ /** {@inheritDoc} */
+ @Override public void beforeTest() throws Exception {
cleanPersistenceDir();
tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(),
TEMP_FILES_DIR, true);
}
- /** Called after test run. */
- @After
- public void after() {
+ /** {@inheritDoc} */
+ @Override public void afterTest() throws Exception {
try {
- ensureResourcesFree(snd);
- ensureResourcesFree(rcv);
+
NodeFileTransmissionResources.extractFrom(snd).awaitIsEmpty(getTestTimeout());
+
NodeFileTransmissionResources.extractFrom(rcv).awaitIsEmpty(getTestTimeout());
}
finally {
stopAllGrids();
@@ -177,8 +174,8 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
rcv.context().io().addTransmissionHandler(topic, new
TransmissionHandlerAdapter() {
@Override public void onEnd(UUID rmtNodeId) {
- ensureResourcesFree(snd);
- ensureResourcesFree(rcv);
+ NodeFileTransmissionResources.extractFrom(snd).assertIsEmpty();
+ NodeFileTransmissionResources.extractFrom(rcv).assertIsEmpty();
}
@Override public String filePath(UUID nodeId, TransmissionMeta
fileMeta) {
@@ -186,7 +183,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
}
@Override public Consumer<File> fileHandler(UUID nodeId,
TransmissionMeta initMeta) {
- return new Consumer<File>() {
+ return new Consumer<>() {
@Override public void accept(File file) {
assertTrue(fileSizes.containsKey(file.getName()));
// Save all params.
@@ -556,7 +553,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
assertNotNull("Transmission must ends with an exception", expectedErr);
- // Open next session and complete successfull.
+ // Open next session and complete successful.
try (GridIoManager.TransmissionSender sender = snd.context()
.io()
.openTransmissionSender(rcv.localNode().id(), topic)) {
@@ -592,7 +589,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
@Test(expected = IgniteCheckedException.class)
public void testFileHandlerChannelCloseIfAnotherOpened() throws Exception {
final int fileSizeBytes = 5 * 1024 * 1024;
- final CountDownLatch waitLatch = new CountDownLatch(2);
+ final CountDownLatch waitLatch = new CountDownLatch(1);
final CountDownLatch completionWait = new CountDownLatch(2);
snd = startGrid(0);
@@ -642,14 +639,13 @@ public class GridIoManagerFileTransmissionSelfTest
extends GridCommonAbstractTes
}
});
- waitLatch.await(5, TimeUnit.SECONDS);
+ assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(completionWait.await(5, TimeUnit.SECONDS));
// Expected that one of the writers will throw exception.
assertFalse("An error must be thrown if connected to the same
topic during processing",
errs[0] == null);
- completionWait.await(5, TimeUnit.SECONDS);
-
throw errs[0];
}
}
@@ -715,7 +711,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
}
}
- return new Consumer<ByteBuffer>() {
+ return new Consumer<>() {
@Override public void accept(ByteBuffer buff) {
try {
assertTrue(buff.order() ==
ByteOrder.nativeOrder());
@@ -743,9 +739,9 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
assertEquals("Remote node must accept all chunks",
fileToSend.length() /
rcv.configuration().getDataStorageConfiguration().getPageSize(),
acceptedChunks.get());
- assertEquals("Received file and sent files have not the same lengtgh",
fileToSend.length(), file.length());
+ assertEquals("Received file and sent files have not the same length",
fileToSend.length(), file.length());
assertCrcEquals(fileToSend, file);
- assertNull(fileIo[0]);
+ assertTrue(waitForCondition(() -> fileIo[0] == null,
getTestTimeout()));
}
/**
@@ -787,7 +783,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
rcv.context().io().addTransmissionHandler(topic, new
TransmissionHandlerAdapter() {
/** {@inheritDoc} */
@Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId,
TransmissionMeta initMeta) {
- return new Consumer<ByteBuffer>() {
+ return new Consumer<>() {
@Override public void accept(ByteBuffer buffer) {
throw new TransmissionCancelledException("Operation
cancelled by the user");
}
@@ -826,7 +822,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
}
@Override public Consumer<File> fileHandler(UUID nodeId,
TransmissionMeta initMeta) {
- return new Consumer<File>() {
+ return new Consumer<>() {
@Override public void accept(File file) {
assertEquals(from1To0.getName(), file.getName());
}
@@ -840,7 +836,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
}
@Override public Consumer<File> fileHandler(UUID nodeId,
TransmissionMeta initMeta) {
- return new Consumer<File>() {
+ return new Consumer<>() {
@Override public void accept(File file) {
assertEquals(from0To1.getName(), file.getName());
}
@@ -887,22 +883,6 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
assertNull("Exception occurred during file sending: " + ex[0], ex[0]);
}
- /**
- * @param ig Ignite instance to check.
- */
- private static void ensureResourcesFree(IgniteEx ig) {
- if (ig == null)
- return;
-
- final GridIoManager io = ig.context().io();
-
- ConcurrentMap<Object, Object> ctxs = GridTestUtils.getFieldValue(io,
"rcvCtxs");
- ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> sndrFlags =
GridTestUtils.getFieldValue(io, "senderStopFlags");
-
- assertTrue("Receiver context map must be empty: " + ctxs,
ctxs.isEmpty());
- assertTrue("Sender stop flags must be empty: " + sndrFlags,
sndrFlags.isEmpty());
- }
-
/**
* @param ignite Ignite instance.
* @param cacheName Cache name to add data to.
@@ -956,20 +936,20 @@ public class GridIoManagerFileTransmissionSelfTest
extends GridCommonAbstractTes
}
}
- /** The defailt implementation of transmit session. */
+ /** The default implementation of transmit session. */
private static class DefaultTransmissionHandler extends
TransmissionHandlerAdapter {
- /** Ignite recevier node. */
+ /** Ignite receiver node. */
private final IgniteEx rcv;
- /** File to be send. */
+ /** File to be sent. */
private final File fileToSend;
/** Temporary local storage. */
private final File tempStorage;
/**
- * @param rcv Ignite recevier node.
- * @param fileToSend File to be send.
+ * @param rcv Ignite receiver node.
+ * @param fileToSend File to be sent.
* @param tempStorage Temporary local storage.
*/
public DefaultTransmissionHandler(IgniteEx rcv, File fileToSend, File
tempStorage) {
@@ -985,7 +965,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
/** {@inheritDoc} */
@Override public Consumer<File> fileHandler(UUID nodeId,
TransmissionMeta initMeta) {
- return new Consumer<File>() {
+ return new Consumer<>() {
@Override public void accept(File file) {
assertEquals(fileToSend.length(), file.length());
assertCrcEquals(fileToSend, file);
@@ -1009,7 +989,7 @@ public class GridIoManagerFileTransmissionSelfTest extends
GridCommonAbstractTes
if (block) {
U.log(log, "Start waiting on trying open a new channel");
- latch.await(5, TimeUnit.SECONDS);
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
}
}
catch (InterruptedException e) {
@@ -1021,7 +1001,7 @@ public class GridIoManagerFileTransmissionSelfTest
extends GridCommonAbstractTes
}
/**
- * The defailt implementation of transmit session.
+ * The default implementation of transmit session.
*/
private static class TransmissionHandlerAdapter implements
TransmissionHandler {
/** {@inheritDoc} */
@@ -1049,4 +1029,52 @@ public class GridIoManagerFileTransmissionSelfTest
extends GridCommonAbstractTes
// No-op.
}
}
+
+ /** */
+ private static class NodeFileTransmissionResources {
+ /** */
+ private final ConcurrentMap<Object, Object> receiverContexts;
+
+ /** */
+ private final ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean>
activeSessions;
+
+ /** */
+ private NodeFileTransmissionResources(
+ ConcurrentMap<Object, Object> receiverContexts,
+ ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> activeSessions
+ ) {
+ this.receiverContexts = receiverContexts;
+ this.activeSessions = activeSessions;
+ }
+
+ /** */
+ void assertIsEmpty() {
+ assertTrue(description(), isEmpty());
+ }
+
+ /** */
+ void awaitIsEmpty(long timeout) throws Exception {
+ assertTrue(description(), waitForCondition(this::isEmpty,
timeout));
+ }
+
+ /** */
+ private boolean isEmpty() {
+ return F.isEmpty(receiverContexts) && F.isEmpty(activeSessions);
+ }
+
+ /** */
+ private String description() {
+ return "Receiver contexts: " + receiverContexts + ", Active
sessions: " + activeSessions;
+ }
+
+ /** */
+ static NodeFileTransmissionResources extractFrom(IgniteEx ignite) {
+ if (ignite == null)
+ return new NodeFileTransmissionResources(null, null);
+
+ final GridIoManager io = ignite.context().io();
+
+ return new NodeFileTransmissionResources(getFieldValue(io,
"rcvCtxs"), getFieldValue(io, "senderStopFlags"));
+ }
+ }
}