This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cb021f586d3 IGNITE-27873 Fix flaky
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest related tests
(#12749)
cb021f586d3 is described below
commit cb021f586d36c01f295e9e47e763222eec1cf2a5
Author: Evgeniy Stanilovskiy <[email protected]>
AuthorDate: Tue Feb 24 08:56:27 2026 +0300
IGNITE-27873 Fix flaky
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest related tests
(#12749)
---
...eCacheQueryAbstractDistributedJoinSelfTest.java | 37 +++++-----------------
...opOnCancelOrTimeoutDistributedJoinSelfTest.java | 34 ++++++++------------
2 files changed, 22 insertions(+), 49 deletions(-)
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index b919a1a871e..5a3e5f6c71d 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@ -18,18 +18,14 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Serializable;
-import java.util.Random;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -65,10 +61,10 @@ public class
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
"order by co._key, pr._key ";
/** */
- protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
+ protected static final String QRY_LONG = "select id1 from (select pe.id as
id1, co.id, pr._key\n" +
"from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co,
\"pu\".Purchase pu\n" +
- "where pe._key = pu.personId and pu.productId = pr._key and
pr.companyId = co._key \n" +
- "order by pe.id desc";
+ "where pe._key = pu.personId and pu.productId = pr._key and
pr.companyId = co._key\n" +
+ "order by pe.id) where id1 > sleep(10)";
/** */
protected static final int GRID_CNT = 2;
@@ -104,6 +100,7 @@ public class
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
cc.setRebalanceMode(SYNC);
cc.setLongQueryWarningTimeout(15_000);
cc.setAffinity(new RendezvousAffinityFunction(false, 60));
+ cc.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class);
switch (name) {
case "pe":
@@ -163,10 +160,8 @@ public class
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
IgniteCache<Integer, Product> pr = grid(0).cache("pr");
- Random rnd = new GridRandom();
-
for (int i = 0; i < PRODUCT_CNT; i++)
- pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
+ pr.put(i, new Product(i, i % COMPANY_CNT));
IgniteCache<Integer, Person> pe = grid(0).cache("pe");
@@ -176,8 +171,8 @@ public class
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");
for (int i = 0; i < PURCHASE_CNT; i++) {
- int persId = rnd.nextInt(PERS_CNT);
- int prodId = rnd.nextInt(PRODUCT_CNT);
+ int persId = i % PERS_CNT;
+ int prodId = i % PRODUCT_CNT;
pu.put(i, new Purchase(persId, prodId));
}
@@ -258,20 +253,4 @@ public class
IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
this.companyId = companyId;
}
}
-
- /** */
- public static class Functions {
- /** */
- @QuerySqlFunction
- public static int sleep() {
- try {
- U.sleep(1_000);
- }
- catch (IgniteInterruptedCheckedException ignored) {
- // No-op.
- }
-
- return 0;
- }
- }
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 07eabab9779..d56413c8369 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@ -25,17 +25,20 @@ import java.util.concurrent.TimeUnit;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.GridProcessor;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
/**
* Test for cancel of query containing distributed joins.
*/
@@ -104,49 +107,40 @@ public class
IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extend
else {
cursor = cache.query(qry);
- ignite.scheduler().runLocal(new Runnable() {
- @Override public void run() {
- cursor.close();
- }
- }, timeoutUnits, timeUnit);
+ ignite.scheduler().runLocal(cursor::close, timeoutUnits, timeUnit);
}
try (QueryCursor<List<?>> ignored = cursor) {
- cursor.getAll();
+ int resSize = F.size(cursor.iterator());
if (checkCanceled)
- fail("Query not canceled");
+ fail("Query not canceled, result size=" + resSize);
}
- catch (CacheException ex) {
- log().error("Got expected exception", ex);
+ catch (CacheException | IgniteException ex) {
+ log().error("Got exception", ex);
assertNotNull("Must throw correct exception", X.cause(ex,
QueryCancelledException.class));
}
- // Give some time to clean up.
- Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) +
3_000);
-
checkCleanState();
}
/**
* Validates clean state on all participating nodes after query
cancellation.
*/
- private void checkCleanState() throws IgniteCheckedException {
+ private void checkCleanState() throws IgniteInterruptedCheckedException {
for (int i = 0; i < GRID_CNT; i++) {
IgniteEx grid = grid(i);
// Validate everything was cleaned up.
- ConcurrentMap<UUID, ?> map =
U.field(((IgniteH2Indexing)U.field((GridProcessor)U.field(
- grid.context(), "qryProc"), "idx")).mapQueryExecutor(),
"qryRess");
-
- String msg = "Map executor state is not cleared";
+ ConcurrentMap<UUID, ?> map = U.field(
+
((IgniteH2Indexing)grid.context().query().getIndexing()).mapQueryExecutor(),
"qryRess");
// TODO FIXME Current implementation leaves map entry for each
node that's ever executed a query.
for (Object result : map.values()) {
Map<Long, ?> m = U.field(result, "res");
- assertEquals(msg, 0, m.size());
+ assertTrue("Map executor state is not cleared",
waitForCondition(m::isEmpty, 1_000L));
}
}
}