This is an automated email from the ASF dual-hosted git repository.

mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 29ea216d43f IGNITE-28011 Fixed flaky 
GridIoManagerFileTransmissionSelfTest (#12828)
29ea216d43f is described below

commit 29ea216d43fc463c83ec26d8b275a4d7c304f4ed
Author: Mikhail Petrov <[email protected]>
AuthorDate: Sat Feb 28 23:46:11 2026 +0300

    IGNITE-28011 Fixed flaky GridIoManagerFileTransmissionSelfTest (#12828)
---
 .../GridIoManagerFileTransmissionSelfTest.java     | 128 +++++++++++++--------
 1 file changed, 78 insertions(+), 50 deletions(-)

diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
index 4a120048afc..6d475231d49 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
@@ -58,6 +58,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -66,13 +67,13 @@ import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.util.IgniteUtils.fileCount;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * Test file transmission manager operations.
@@ -114,22 +115,18 @@ public class GridIoManagerFileTransmissionSelfTest 
extends GridCommonAbstractTes
         topic = GridTopic.TOPIC_CACHE.topic("test", 0);
     }
 
-    /**
-     * @throws Exception if failed.
-     */
-    @Before
-    public void before() throws Exception {
+    /** {@inheritDoc} */
+    @Override public void beforeTest() throws Exception {
         cleanPersistenceDir();
 
         tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), 
TEMP_FILES_DIR, true);
     }
 
-    /** Called after test run. */
-    @After
-    public void after() {
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
         try {
-            ensureResourcesFree(snd);
-            ensureResourcesFree(rcv);
+            
NodeFileTransmissionResources.extractFrom(snd).awaitIsEmpty(getTestTimeout());
+            
NodeFileTransmissionResources.extractFrom(rcv).awaitIsEmpty(getTestTimeout());
         }
         finally {
             stopAllGrids();
@@ -177,8 +174,8 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
 
         rcv.context().io().addTransmissionHandler(topic, new 
TransmissionHandlerAdapter() {
             @Override public void onEnd(UUID rmtNodeId) {
-                ensureResourcesFree(snd);
-                ensureResourcesFree(rcv);
+                NodeFileTransmissionResources.extractFrom(snd).assertIsEmpty();
+                NodeFileTransmissionResources.extractFrom(rcv).assertIsEmpty();
             }
 
             @Override public String filePath(UUID nodeId, TransmissionMeta 
fileMeta) {
@@ -186,7 +183,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
             }
 
             @Override public Consumer<File> fileHandler(UUID nodeId, 
TransmissionMeta initMeta) {
-                return new Consumer<File>() {
+                return new Consumer<>() {
                     @Override public void accept(File file) {
                         assertTrue(fileSizes.containsKey(file.getName()));
                         // Save all params.
@@ -556,7 +553,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
 
         assertNotNull("Transmission must ends with an exception", expectedErr);
 
-        // Open next session and complete successfull.
+        // Open next session and complete successful.
         try (GridIoManager.TransmissionSender sender = snd.context()
             .io()
             .openTransmissionSender(rcv.localNode().id(), topic)) {
@@ -592,7 +589,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
     @Test(expected = IgniteCheckedException.class)
     public void testFileHandlerChannelCloseIfAnotherOpened() throws Exception {
         final int fileSizeBytes = 5 * 1024 * 1024;
-        final CountDownLatch waitLatch = new CountDownLatch(2);
+        final CountDownLatch waitLatch = new CountDownLatch(1);
         final CountDownLatch completionWait = new CountDownLatch(2);
 
         snd = startGrid(0);
@@ -642,14 +639,13 @@ public class GridIoManagerFileTransmissionSelfTest 
extends GridCommonAbstractTes
                 }
             });
 
-            waitLatch.await(5, TimeUnit.SECONDS);
+            assertTrue(waitLatch.await(5, TimeUnit.SECONDS));
+            assertTrue(completionWait.await(5, TimeUnit.SECONDS));
 
             // Expected that one of the writers will throw exception.
             assertFalse("An error must be thrown if connected to the same 
topic during processing",
                 errs[0] == null);
 
-            completionWait.await(5, TimeUnit.SECONDS);
-
             throw errs[0];
         }
     }
@@ -715,7 +711,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
                     }
                 }
 
-                return new Consumer<ByteBuffer>() {
+                return new Consumer<>() {
                     @Override public void accept(ByteBuffer buff) {
                         try {
                             assertTrue(buff.order() == 
ByteOrder.nativeOrder());
@@ -743,9 +739,9 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
         assertEquals("Remote node must accept all chunks",
             fileToSend.length() / 
rcv.configuration().getDataStorageConfiguration().getPageSize(),
             acceptedChunks.get());
-        assertEquals("Received file and sent files have not the same lengtgh", 
fileToSend.length(), file.length());
+        assertEquals("Received file and sent files have not the same length", 
fileToSend.length(), file.length());
         assertCrcEquals(fileToSend, file);
-        assertNull(fileIo[0]);
+        assertTrue(waitForCondition(() -> fileIo[0] == null, 
getTestTimeout()));
     }
 
     /**
@@ -787,7 +783,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
         rcv.context().io().addTransmissionHandler(topic, new 
TransmissionHandlerAdapter() {
             /** {@inheritDoc} */
             @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, 
TransmissionMeta initMeta) {
-                return new Consumer<ByteBuffer>() {
+                return new Consumer<>() {
                     @Override public void accept(ByteBuffer buffer) {
                         throw new TransmissionCancelledException("Operation 
cancelled by the user");
                     }
@@ -826,7 +822,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
             }
 
             @Override public Consumer<File> fileHandler(UUID nodeId, 
TransmissionMeta initMeta) {
-                return new Consumer<File>() {
+                return new Consumer<>() {
                     @Override public void accept(File file) {
                         assertEquals(from1To0.getName(), file.getName());
                     }
@@ -840,7 +836,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
             }
 
             @Override public Consumer<File> fileHandler(UUID nodeId, 
TransmissionMeta initMeta) {
-                return new Consumer<File>() {
+                return new Consumer<>() {
                     @Override public void accept(File file) {
                         assertEquals(from0To1.getName(), file.getName());
                     }
@@ -887,22 +883,6 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
         assertNull("Exception occurred during file sending: " + ex[0], ex[0]);
     }
 
-    /**
-     * @param ig Ignite instance to check.
-     */
-    private static void ensureResourcesFree(IgniteEx ig) {
-        if (ig == null)
-            return;
-
-        final GridIoManager io = ig.context().io();
-
-        ConcurrentMap<Object, Object> ctxs = GridTestUtils.getFieldValue(io, 
"rcvCtxs");
-        ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> sndrFlags = 
GridTestUtils.getFieldValue(io, "senderStopFlags");
-
-        assertTrue("Receiver context map must be empty: " + ctxs, 
ctxs.isEmpty());
-        assertTrue("Sender stop flags must be empty: " + sndrFlags, 
sndrFlags.isEmpty());
-    }
-
     /**
      * @param ignite Ignite instance.
      * @param cacheName Cache name to add data to.
@@ -956,20 +936,20 @@ public class GridIoManagerFileTransmissionSelfTest 
extends GridCommonAbstractTes
         }
     }
 
-    /** The defailt implementation of transmit session. */
+    /** The default implementation of transmit session. */
     private static class DefaultTransmissionHandler extends 
TransmissionHandlerAdapter {
-        /** Ignite recevier node. */
+        /** Ignite receiver node. */
         private final IgniteEx rcv;
 
-        /** File to be send. */
+        /** File to be sent. */
         private final File fileToSend;
 
         /** Temporary local storage. */
         private final File tempStorage;
 
         /**
-         * @param rcv Ignite recevier node.
-         * @param fileToSend File to be send.
+         * @param rcv Ignite receiver node.
+         * @param fileToSend File to be sent.
          * @param tempStorage Temporary local storage.
          */
         public DefaultTransmissionHandler(IgniteEx rcv, File fileToSend, File 
tempStorage) {
@@ -985,7 +965,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
 
         /** {@inheritDoc} */
         @Override public Consumer<File> fileHandler(UUID nodeId, 
TransmissionMeta initMeta) {
-            return new Consumer<File>() {
+            return new Consumer<>() {
                 @Override public void accept(File file) {
                     assertEquals(fileToSend.length(), file.length());
                     assertCrcEquals(fileToSend, file);
@@ -1009,7 +989,7 @@ public class GridIoManagerFileTransmissionSelfTest extends 
GridCommonAbstractTes
                 if (block) {
                     U.log(log, "Start waiting on trying open a new channel");
 
-                    latch.await(5, TimeUnit.SECONDS);
+                    assertTrue(latch.await(5, TimeUnit.SECONDS));
                 }
             }
             catch (InterruptedException e) {
@@ -1021,7 +1001,7 @@ public class GridIoManagerFileTransmissionSelfTest 
extends GridCommonAbstractTes
     }
 
     /**
-     * The defailt implementation of transmit session.
+     * The default implementation of transmit session.
      */
     private static class TransmissionHandlerAdapter implements 
TransmissionHandler {
         /** {@inheritDoc} */
@@ -1049,4 +1029,52 @@ public class GridIoManagerFileTransmissionSelfTest 
extends GridCommonAbstractTes
             // No-op.
         }
     }
+
+    /** */
+    private static class NodeFileTransmissionResources {
+        /** */
+        private final ConcurrentMap<Object, Object> receiverContexts;
+
+        /** */
+        private final ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> 
activeSessions;
+
+        /** */
+        private NodeFileTransmissionResources(
+            ConcurrentMap<Object, Object> receiverContexts,
+            ConcurrentMap<T2<UUID, IgniteUuid>, AtomicBoolean> activeSessions
+        ) {
+            this.receiverContexts = receiverContexts;
+            this.activeSessions = activeSessions;
+        }
+
+        /** */
+        void assertIsEmpty() {
+            assertTrue(description(), isEmpty());
+        }
+
+        /** */
+        void awaitIsEmpty(long timeout) throws Exception {
+            assertTrue(description(), waitForCondition(this::isEmpty, 
timeout));
+        }
+
+        /** */
+        private boolean isEmpty() {
+            return F.isEmpty(receiverContexts) && F.isEmpty(activeSessions);
+        }
+
+        /** */
+        private String description() {
+            return "Receiver contexts: " + receiverContexts + ", Active 
sessions: " + activeSessions;
+        }
+
+        /** */
+        static NodeFileTransmissionResources extractFrom(IgniteEx ignite) {
+            if (ignite == null)
+                return new NodeFileTransmissionResources(null, null);
+
+            final GridIoManager io = ignite.context().io();
+
+            return new NodeFileTransmissionResources(getFieldValue(io, 
"rcvCtxs"), getFieldValue(io, "senderStopFlags"));
+        }
+    }
 }

Reply via email to