This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e457dcc98e5 IGNITE-25449 Fix memory leak in HeavyQueriesTracker -
Fixes #12087.
e457dcc98e5 is described below
commit e457dcc98e53cf131a90db78d1d5814a1e3a565b
Author: oleg-vlsk <[email protected]>
AuthorDate: Thu Jun 19 10:56:36 2025 +0300
IGNITE-25449 Fix memory leak in HeavyQueriesTracker - Fixes #12087.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../integration/SqlDiagnosticIntegrationTest.java | 99 ++++++++
.../query/running/HeavyQueriesTracker.java | 4 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 3 -
.../query/h2/twostep/MapQueryResult.java | 2 +
.../processors/query/LongRunningQueryTest.java | 258 +++++++++++++++++++--
5 files changed, 336 insertions(+), 30 deletions(-)
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 141801edea5..ef625931ba9 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -36,7 +36,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
@@ -53,6 +55,7 @@ import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
@@ -61,10 +64,12 @@ import
org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
+import org.apache.ignite.internal.processors.query.calcite.RootQuery;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
@@ -96,6 +101,7 @@ import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTr
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG;
import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Test SQL diagnostic tools.
@@ -781,6 +787,72 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
}
assertTrue(logLsnr2.check(1000L));
+
+ assertTrue(isHeavyQueriesTrackerEmpty());
+ }
+
+ /**
+ * Verifies that once the query is fully fetched, it is no longer tracked
and its information encapsulated in a
+ * {@link RootQuery} instance is removed from {@link HeavyQueriesTracker}.
+ */
+ @Test
+ public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() throws
IgniteInterruptedCheckedException {
+ Iterator<?> it = runNotFullyFetchedQuery(false).iterator();
+
+ assertFalse(isHeavyQueriesTrackerEmpty());
+
+ it.forEachRemaining(x -> {});
+
+ assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
+ }
+
+ /**
+ * Verifies that once the cursor of a not fully fetched query is closed,
it is no longer tracked and its information
+ * encapsulated in a {@link RootQuery} instance is removed from {@link
HeavyQueriesTracker}.
+ */
+ @Test
+ public void testEmptyHeavyQueriesTrackerWithClosedCursor() throws
IgniteInterruptedCheckedException {
+ FieldsQueryCursor<List<?>> cursor = runNotFullyFetchedQuery(false);
+
+ assertFalse(isHeavyQueriesTrackerEmpty());
+
+ cursor.close();
+
+ assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
+ }
+
+ /**
+ * Verifies that once a not fully fetched query is cancelled, it is no
longer tracked and its information
+ * encapsulated in a {@link RootQuery} instance is removed from {@link
HeavyQueriesTracker}.
+ */
+ @Test
+ public void testEmptyHeavyQueriesTrackerWithCancelledQuery() throws
IgniteInterruptedCheckedException {
+ runNotFullyFetchedQuery(false);
+
+ assertFalse(isHeavyQueriesTrackerEmpty());
+
+ RootQuery<?> rootQry =
(RootQuery<?>)heavyQueriesTracker().getQueries().iterator().next();
+
+ grid(0).context().query().cancelQuery(rootQry.localQueryId(),
rootQry.initiatorNodeId(), false);
+
+ assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
+ }
+
+ /**
+ * Verifies that once a not fully fetched local query is cancelled, it is
no longer tracked and its information
+ * encapsulated in a {@link RootQuery} instance is removed from {@link
HeavyQueriesTracker}.
+ */
+ @Test
+ public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() throws
IgniteInterruptedCheckedException {
+ runNotFullyFetchedQuery(true);
+
+ assertFalse(isHeavyQueriesTrackerEmpty());
+
+ RootQuery<?> rootQry =
(RootQuery<?>)heavyQueriesTracker().getQueries().iterator().next();
+
+
grid(0).context().query().cancelLocalQueries(Set.of(rootQry.localQueryId()));
+
+ assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
}
/** */
@@ -925,6 +997,33 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
}
}
+ /** */
+ private FieldsQueryCursor<List<?>> runNotFullyFetchedQuery(boolean loc) {
+ IgniteCache<Long, Long> cache = grid(0).createCache(new
CacheConfiguration<Long, Long>()
+ .setName("test")
+ .setQueryEntities(Collections.singleton(new
QueryEntity(Long.class, Long.class)
+ .setTableName("test")
+ .addQueryField("id", Long.class.getName(), null)
+ .addQueryField("val", Long.class.getName(), null)
+ .setKeyFieldName("id")
+ .setValueFieldName("val"))));
+
+ for (long i = 0; i < 10; ++i)
+ cache.put(i, i);
+
+ return cache.query(new SqlFieldsQuery("select * from
test").setLocal(loc).setPageSize(1));
+ }
+
+ /** */
+ private HeavyQueriesTracker heavyQueriesTracker() {
+ return
grid(0).context().query().runningQueryManager().heavyQueriesTracker();
+ }
+
+ /** */
+ private boolean isHeavyQueriesTrackerEmpty() {
+ return heavyQueriesTracker().getQueries().isEmpty();
+ }
+
/** */
public static class FunctionsLibrary {
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
index 1a3654a415a..69b8e635198 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
@@ -143,9 +143,7 @@ public final class HeavyQueriesTracker {
public void stopTracking(TrackableQuery qryInfo, @Nullable Throwable err) {
assert qryInfo != null;
- qrys.remove(qryInfo);
-
- if (qryInfo.time() > timeout) {
+ if (qrys.remove(qryInfo) != null && qryInfo.time() > timeout) {
if (err == null)
LT.warn(log, LONG_QUERY_FINISHED_MSG +
qryInfo.queryInfo(null));
else
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2a0e8824190..47d3000acf0 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -985,9 +985,6 @@ public class GridMapQueryExecutor {
if (last) {
qr.closeResult(qry);
- if (res.qryInfo() != null)
- h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null);
-
if (qr.isAllClosed()) {
nodeRess.remove(qr.queryRequestId(), segmentId, qr);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 441a7f0a97d..0d844d289f5 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -388,6 +388,8 @@ class MapQueryResult {
}
U.close(rs, log);
+
+ h2.heavyQueriesTracker().stopTracking(qryInfo, null);
}
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index d9db970ebdd..c381e8e60f9 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -17,13 +17,24 @@
package org.apache.ignite.internal.processors.query;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayDeque;
import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -34,6 +45,7 @@ import
org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
@@ -44,10 +56,15 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
import static java.lang.Thread.currentThread;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE;
/**
@@ -57,6 +74,9 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
/** Keys count. */
private static final int KEY_CNT = 1000;
+ /** Number of keys to be queries in lazy queries. */
+ private static final int LAZY_QRYS_KEY_CNT = 5;
+
/** Long query warning timeout. */
private static final int LONG_QUERY_WARNING_TIMEOUT = 1000;
@@ -87,6 +107,15 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
/** Log listener for long DMLs. */
private static LogListener lsnrDml;
+ /** Multi-node test rule. */
+ @Rule
+ public final MultiNodeTestRule multiNodeTestRule = new MultiNodeTestRule();
+
+ /** Annotation for the {@link MultiNodeTestRule}. */
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ public @interface MultiNodeTest {}
+
/** Page size. */
private int pageSize = DEFAULT_PAGE_SIZE;
@@ -112,13 +141,9 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
return cfg.setSqlConfiguration(new
SqlConfiguration().setLongQueryWarningTimeout(LONG_QUERY_WARNING_TIMEOUT));
}
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- ignite = startGrid();
-
- IgniteCache c = grid().createCache(new CacheConfiguration<Long, Long>()
+ /** */
+ private void prepareTestEnvironment() {
+ IgniteCache c = grid(0).createCache(new CacheConfiguration<Long,
Long>()
.setName("test")
.setQueryEntities(Collections.singleton(new
QueryEntity(Long.class, Long.class)
.setTableName("test")
@@ -133,7 +158,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
for (long i = 0; i < KEY_CNT; ++i)
c.put(i, i);
- IgniteCache c2 = grid().createCache(cacheConfig("pers", Integer.class,
Person.class));
+ IgniteCache c2 = grid(0).createCache(cacheConfig("pers",
Integer.class, Person.class));
c2.put(1001, new Person(1, "p1"));
@@ -147,6 +172,8 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ checkQryInfoCount(0);
+
stopAllGrids();
super.afterTest();
@@ -392,6 +419,104 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
checkBigResultSet();
}
+ /**
+ * Verifies that while a query is not fully fetched, its {@link
H2QueryInfo} is kept in {@link HeavyQueriesTracker}
+ * on all cluster nodes and its {@link H2QueryInfo#isSuspended()} returns
{@code true}. Then, once the query is fully
+ * fetched, its {@link H2QueryInfo} is removed from {@link
HeavyQueriesTracker}.
+ */
+ @Test
+ @MultiNodeTest
+ public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() {
+ Iterator<?> it = queryCursor(false).iterator();
+
+ checkQryInfoCount(gridCount());
+
+ H2QueryInfo qry =
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+ assertTrue(qry.isSuspended());
+
+ it.forEachRemaining(x -> {});
+ }
+
+ /**
+ * Verifies that when the cursor of a not fully fetched query is closed,
its {@link H2QueryInfo} is removed from
+ * {@link HeavyQueriesTracker} on all cluster nodes.
+ */
+ @Test
+ @MultiNodeTest
+ public void testEmptyHeavyQueriesTrackerWithClosedCursor() {
+ FieldsQueryCursor<List<?>> cursor = queryCursor(false);
+
+ cursor.iterator().next();
+
+ checkQryInfoCount(gridCount());
+
+ H2QueryInfo qryInfo =
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+ assertTrue(qryInfo.isSuspended());
+
+ cursor.close();
+ }
+
+ /**
+ * Verifies that when a not fully fetched query is cancelled, its {@link
H2QueryInfo} is removed from
+ * {@link HeavyQueriesTracker} on all cluster nodes.
+ */
+ @Test
+ @MultiNodeTest
+ public void testEmptyHeavyQueriesTrackerWithCancelledQuery() {
+ long qryId = runNotFullyFetchedQuery(false);
+
+ checkQryInfoCount(gridCount());
+
+ cancelQuery(qryId);
+ }
+
+ /**
+ * Verifies that when a not fully fetched local query is cancelled, its
{@link H2QueryInfo} is removed from
+ * {@link HeavyQueriesTracker} on all cluster nodes.
+ */
+ @Test
+ @MultiNodeTest
+ public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() {
+ long qryId = runNotFullyFetchedQuery(true);
+
+ checkQryInfoCount(1);
+
+ ((IgniteEx)ignite).context().query().cancelLocalQueries(Set.of(qryId));
+ }
+
+ /**
+ * Verifies that when multiple not fully fetched queries are cancelled
separately, corresponding
+ * {@link H2QueryInfo} instances are removed from {@link
HeavyQueriesTracker} on all cluster nodes.
+ */
+ @Test
+ @MultiNodeTest
+ public void testEmptyHeavyQueriesTrackerWithMultipleCancelledQueries() {
+ int qryCnt = 4;
+ int cnldQryCnt = 2;
+
+ for (int i = 0; i < qryCnt; i++)
+ runNotFullyFetchedQuery(false);
+
+ checkQryInfoCount(gridCount() * qryCnt);
+
+ Deque<Long> qryIds = new ArrayDeque<>(getQueryIdsOnNode(0));
+
+ Set<Long> cnldQryIds = new HashSet<>();
+
+ for (int i = 0; i < cnldQryCnt; i++)
+ cnldQryIds.add(cancelQuery(qryIds.poll()));
+
+ checkQryInfoCount(gridCount() * (qryCnt - cnldQryCnt));
+
+ for (int i = 0; i < gridCount(); i++)
+ assertTrue(getQueryIdsOnNode(i).stream().allMatch(id ->
!cnldQryIds.contains(id) && qryIds.contains(id)));
+
+ while (!qryIds.isEmpty())
+ cancelQuery(qryIds.poll());
+ }
+
/**
* Do several fast queries.
* Log messages must not contain info about long query.
@@ -467,7 +592,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
pageSize = 1;
try {
- assertFalse(sql("test", sql, args).iterator().next().isEmpty());
+ assertEquals(LAZY_QRYS_KEY_CNT, sql("test", sql,
args).getAll().size());
}
finally {
pageSize = DEFAULT_PAGE_SIZE;
@@ -507,14 +632,14 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
private void sqlCheckLongRunning() {
if (lazy && withMergeTable) {
String select = "select o.name n1, p.name n2 from Person p,
\"org\".Organization o" +
- " where p.orgId = o._key and o._key=1 and o._key <
sleep_func(?)" +
+ " where p.orgId = o._key and o._key=1 and o._key <
sleep_func(?, ?)" +
" union select o.name n1, p.name n2 from Person p,
\"org\".Organization o" +
" where p.orgId = o._key and o._key=2";
- sqlCheckLongRunningLazyWithMergeTable(select, 2000);
+ sqlCheckLongRunningLazyWithMergeTable(select, 2000,
LAZY_QRYS_KEY_CNT);
}
else if (lazy && !withMergeTable)
- sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key <
sleep_func(?)", 2000);
+ sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key <
sleep_func(?, ?)", 2000, LAZY_QRYS_KEY_CNT);
else
sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1,
test AS T2 where T0.id > ?", 0);
}
@@ -545,9 +670,9 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
testLog().registerListener(lsnr);
- try {
- Iterator<List<?>> it = sql("test", "select * from
test").iterator();
+ Iterator<List<?>> it = sql("test", "select * from test").iterator();
+ try {
it.next();
long sleepStartTs = U.currentTimeMillis();
@@ -565,6 +690,8 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
}
finally {
pageSize = DEFAULT_PAGE_SIZE;
+
+ it.forEachRemaining(x -> {});
}
}
@@ -583,24 +710,66 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
assertTrue(lsnrDml.check());
}
+ /**
+ * @param loc Flag indicating if the query is local.
+ * @return Query id.
+ */
+ private long runNotFullyFetchedQuery(boolean loc) {
+ queryCursor(loc).iterator().next();
+
+ H2QueryInfo qryInfo =
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+ assertTrue(qryInfo.isSuspended());
+
+ return qryInfo.queryId();
+ }
+
+ /**
+ * @param loc Flag indicating if the query is local.
+ * @return Query cursor.
+ */
+ private FieldsQueryCursor<List<?>> queryCursor(boolean loc) {
+ return ignite.cache("test").query(new SqlFieldsQuery("select * from
test").setLocal(loc).setPageSize(1));
+ }
+
+ /**
+ * @param qryId Query id.
+ * @return Cancelled query id.
+ */
+ private long cancelQuery(long qryId) {
+ ((IgniteEx)ignite).context().query().cancelQuery(qryId,
ignite.cluster().node().id(), false);
+
+ return qryId;
+ }
+
+ /**
+ * @param nodeIdx Node index.
+ * @return Set of query ids registered on a node.
+ */
+ private Set<Long> getQueryIdsOnNode(int nodeIdx) {
+ return heavyQueriesTracker(nodeIdx).getQueries().stream()
+ .map(query -> ((H2QueryInfo)query).queryId())
+ .collect(Collectors.toSet());
+ }
+
/**
* Utility class with custom SQL functions.
*/
public static class TestSQLFunctions {
/**
- * @param v amount of milliseconds to sleep
- * @return amount of milliseconds to sleep
+ * @param sleep amount of milliseconds to sleep
+ * @param val value to be returned by the function
*/
@SuppressWarnings("unused")
@QuerySqlFunction
- public static int sleep_func(int v) {
+ public static int sleep_func(int sleep, int val) {
try {
- Thread.sleep(v);
+ Thread.sleep(sleep);
}
catch (InterruptedException ignored) {
// No-op
}
- return v;
+ return val;
}
/** */
@@ -608,7 +777,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
@QuerySqlFunction
public static int wait_func() {
try {
- GridTestUtils.waitForCondition(() -> lsnrDml.check(), 10_000);
+ waitForCondition(() -> lsnrDml.check(), 10_000);
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op
@@ -626,13 +795,13 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
private ListeningTestLogger testLog() {
ListeningTestLogger testLog = new ListeningTestLogger(log);
-
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(),
+
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid(0).context().query().getIndexing()).heavyQueriesTracker(),
"log", testLog);
-
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).mapQueryExecutor(),
+
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid(0).context().query().getIndexing()).mapQueryExecutor(),
"log", testLog);
- GridTestUtils.setFieldValue(grid().context().query().getIndexing(),
"log", testLog);
+ GridTestUtils.setFieldValue(grid(0).context().query().getIndexing(),
"log", testLog);
return testLog;
}
@@ -643,7 +812,32 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
* @return Heavy queries tracker.
*/
private HeavyQueriesTracker heavyQueriesTracker() {
- return
((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker();
+ return heavyQueriesTracker(0);
+ }
+
+ /** */
+ private HeavyQueriesTracker heavyQueriesTracker(int idx) {
+ return
((IgniteH2Indexing)grid(idx).context().query().getIndexing()).heavyQueriesTracker();
+ }
+
+ /**
+ * @param exp Expected number of {@link H2QueryInfo} instances registered
in {@link HeavyQueriesTracker}
+ * on all cluster nodes.
+ */
+ private void checkQryInfoCount(int exp) {
+ try {
+ assertTrue(waitForCondition(
+ () -> IntStream.range(0, gridCount()).map(i ->
heavyQueriesTracker(i).getQueries().size()).sum() == exp,
+ 3_000));
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ // No-op
+ }
+ }
+
+ /** */
+ private int gridCount() {
+ return Ignition.allGrids().size();
}
/** */
@@ -679,4 +873,20 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
this.name = name;
}
}
+
+ /** Test rule that allows starting a test in the multi-node mode via the
{@link MultiNodeTest} annotation. */
+ private class MultiNodeTestRule implements TestRule {
+ /** {@inheritDoc} */
+ @Override public Statement apply(Statement base, Description
description) {
+ return new Statement() {
+ @Override public void evaluate() throws Throwable {
+ ignite =
startGrids(description.getAnnotation(MultiNodeTest.class) == null ? 1 : 3);
+
+ prepareTestEnvironment();
+
+ base.evaluate();
+ }
+ };
+ }
+ }
}