This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new cb021f586d3 IGNITE-27873 Fix flaky 
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest related tests 
(#12749)
cb021f586d3 is described below

commit cb021f586d36c01f295e9e47e763222eec1cf2a5
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Tue Feb 24 08:56:27 2026 +0300

    IGNITE-27873 Fix flaky 
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest related tests 
(#12749)
---
 ...eCacheQueryAbstractDistributedJoinSelfTest.java | 37 +++++-----------------
 ...opOnCancelOrTimeoutDistributedJoinSelfTest.java | 34 ++++++++------------
 2 files changed, 22 insertions(+), 49 deletions(-)

diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index b919a1a871e..5a3e5f6c71d 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@ -18,18 +18,14 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.io.Serializable;
-import java.util.Random;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -65,10 +61,10 @@ public class 
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
         "order by co._key, pr._key ";
 
     /** */
-    protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
+    protected static final String QRY_LONG = "select id1 from (select pe.id as 
id1, co.id, pr._key\n" +
         "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, 
\"pu\".Purchase pu\n" +
-        "where pe._key = pu.personId and pu.productId = pr._key and 
pr.companyId = co._key \n" +
-        "order by pe.id desc";
+        "where pe._key = pu.personId and pu.productId = pr._key and 
pr.companyId = co._key\n" +
+        "order by pe.id) where id1 > sleep(10)";
 
     /** */
     protected static final int GRID_CNT = 2;
@@ -104,6 +100,7 @@ public class 
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
             cc.setRebalanceMode(SYNC);
             cc.setLongQueryWarningTimeout(15_000);
             cc.setAffinity(new RendezvousAffinityFunction(false, 60));
+            cc.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class);
 
             switch (name) {
                 case "pe":
@@ -163,10 +160,8 @@ public class 
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
 
         IgniteCache<Integer, Product> pr = grid(0).cache("pr");
 
-        Random rnd = new GridRandom();
-
         for (int i = 0; i < PRODUCT_CNT; i++)
-            pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
+            pr.put(i, new Product(i, i % COMPANY_CNT));
 
         IgniteCache<Integer, Person> pe = grid(0).cache("pe");
 
@@ -176,8 +171,8 @@ public class 
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
         IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");
 
         for (int i = 0; i < PURCHASE_CNT; i++) {
-            int persId = rnd.nextInt(PERS_CNT);
-            int prodId = rnd.nextInt(PRODUCT_CNT);
+            int persId = i % PERS_CNT;
+            int prodId = i % PRODUCT_CNT;
 
             pu.put(i, new Purchase(persId, prodId));
         }
@@ -258,20 +253,4 @@ public class 
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
             this.companyId = companyId;
         }
     }
-
-    /** */
-    public static class Functions {
-        /** */
-        @QuerySqlFunction
-        public static int sleep() {
-            try {
-                U.sleep(1_000);
-            }
-            catch (IgniteInterruptedCheckedException ignored) {
-                // No-op.
-            }
-
-            return 0;
-        }
-    }
 }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 07eabab9779..d56413c8369 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@ -25,17 +25,20 @@ import java.util.concurrent.TimeUnit;
 import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.GridProcessor;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Test;
 
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
 /**
  * Test for cancel of query containing distributed joins.
  */
@@ -104,49 +107,40 @@ public class 
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
         else {
             cursor = cache.query(qry);
 
-            ignite.scheduler().runLocal(new Runnable() {
-                @Override public void run() {
-                    cursor.close();
-                }
-            }, timeoutUnits, timeUnit);
+            ignite.scheduler().runLocal(cursor::close, timeoutUnits, timeUnit);
         }
 
         try (QueryCursor<List<?>> ignored = cursor) {
-            cursor.getAll();
+            int resSize = F.size(cursor.iterator());
 
             if (checkCanceled)
-                fail("Query not canceled");
+                fail("Query not canceled, result size=" + resSize);
         }
-        catch (CacheException ex) {
-            log().error("Got expected exception", ex);
+        catch (CacheException | IgniteException ex) {
+            log().error("Got exception", ex);
 
             assertNotNull("Must throw correct exception", X.cause(ex, 
QueryCancelledException.class));
         }
 
-        // Give some time to clean up.
-        Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 
3_000);
-
         checkCleanState();
     }
 
     /**
      * Validates clean state on all participating nodes after query 
cancellation.
      */
-    private void checkCleanState() throws IgniteCheckedException {
+    private void checkCleanState() throws IgniteInterruptedCheckedException {
         for (int i = 0; i < GRID_CNT; i++) {
             IgniteEx grid = grid(i);
 
             // Validate everything was cleaned up.
-            ConcurrentMap<UUID, ?> map = 
U.field(((IgniteH2Indexing)U.field((GridProcessor)U.field(
-                    grid.context(), "qryProc"), "idx")).mapQueryExecutor(), 
"qryRess");
-
-            String msg = "Map executor state is not cleared";
+            ConcurrentMap<UUID, ?> map = U.field(
+                
((IgniteH2Indexing)grid.context().query().getIndexing()).mapQueryExecutor(), 
"qryRess");
 
             // TODO FIXME Current implementation leaves map entry for each 
node that's ever executed a query.
             for (Object result : map.values()) {
                 Map<Long, ?> m = U.field(result, "res");
 
-                assertEquals(msg, 0, m.size());
+                assertTrue("Map executor state is not cleared", 
waitForCondition(m::isEmpty, 1_000L));
             }
         }
     }

Reply via email to