IGNITE-5991: SQL: Lazy query execution. This closes #2437.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/136075ae Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/136075ae Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/136075ae Branch: refs/heads/ignite-5578 Commit: 136075ae0f7070999dec6913afc8cef1a26eb307 Parents: 15710a8 Author: devozerov <[email protected]> Authored: Thu Aug 17 18:24:34 2017 +0300 Committer: devozerov <[email protected]> Committed: Thu Aug 17 18:24:34 2017 +0300 ---------------------------------------------------------------------- .../ignite/cache/query/SqlFieldsQuery.java | 38 +- .../processors/query/h2/IgniteH2Indexing.java | 57 ++- .../query/h2/twostep/GridMapQueryExecutor.java | 237 ++++++++--- .../h2/twostep/GridReduceQueryExecutor.java | 7 +- .../query/h2/twostep/MapNodeResults.java | 19 +- .../query/h2/twostep/MapQueryLazyWorker.java | 176 +++++++++ .../query/h2/twostep/MapQueryLazyWorkerKey.java | 97 +++++ .../query/h2/twostep/MapQueryResult.java | 46 ++- .../query/h2/twostep/MapQueryResults.java | 26 +- .../query/h2/twostep/MapRequestKey.java | 23 +- .../h2/twostep/msg/GridH2QueryRequest.java | 9 + .../processors/query/LazyQuerySelfTest.java | 389 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 13 files changed, 1041 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 2838fe3..54f8396 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -71,6 +71,9 @@ public class SqlFieldsQuery extends Query<List<?>> { /** */ private boolean replicatedOnly; + /** */ + private boolean lazy; + /** Partitions for query */ private int[] parts; @@ -230,7 +233,7 @@ public class SqlFieldsQuery extends Query<List<?>> { /** * Check if distributed joins are enabled for this query. * - * @return {@code true} If distributed joind enabled. + * @return {@code true} If distributed joins enabled. */ public boolean isDistributedJoins() { return distributedJoins; @@ -269,6 +272,39 @@ public class SqlFieldsQuery extends Query<List<?>> { } /** + * Sets lazy query execution flag. + * <p> + * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small + * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus + * increasing concurrency. + * <p> + * If result set is too big to fit in available memory this could lead to excessive GC pauses and even + * OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory + * consumption at the cost of moderate performance hit. + * <p> + * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly. + * + * @param lazy Lazy query execution flag. + * @return {@code this} For chaining. + */ + public SqlFieldsQuery setLazy(boolean lazy) { + this.lazy = lazy; + + return this; + } + + /** + * Gets lazy query execution flag. + * <p> + * See {@link #setLazy(boolean)} for more information. + * + * @return Lazy flag. + */ + public boolean isLazy() { + return lazy; + } + + /** * Gets partitions for query, in ascending order. */ @Nullable public int[] getPartitions() { http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 007eeb1..6896f18 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -111,6 +111,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; +import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -139,6 +140,7 @@ import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; import org.h2.command.dml.Insert; +import org.h2.engine.Session; import org.h2.engine.SysProperties; import org.h2.index.Index; import org.h2.jdbc.JdbcPreparedStatement; @@ -905,24 +907,32 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @throws IgniteCheckedException If failed. */ private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt, - int timeoutMillis, @Nullable GridQueryCancel cancel) - throws IgniteCheckedException { + int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException { + final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker(); if (cancel != null) { cancel.set(new Runnable() { @Override public void run() { - try { - stmt.cancel(); - } - catch (SQLException ignored) { - // No-op. + if (lazyWorker != null) { + lazyWorker.submit(new Runnable() { + @Override public void run() { + cancelStatement(stmt); + } + }); } + else + cancelStatement(stmt); } }); } + Session ses = H2Utils.session(conn); + if (timeoutMillis > 0) - H2Utils.session(conn).setQueryTimeout(timeoutMillis); + ses.setQueryTimeout(timeoutMillis); + + if (lazyWorker != null) + ses.setLazyQueryExecution(true); try { return stmt.executeQuery(); @@ -936,7 +946,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { if (timeoutMillis > 0) - H2Utils.session(conn).setQueryTimeout(0); + ses.setQueryTimeout(0); + + if (lazyWorker != null) + ses.setLazyQueryExecution(false); + } + } + + /** + * Cancel prepared statement. + * + * @param stmt Statement. + */ + private static void cancelStatement(PreparedStatement stmt) { + try { + stmt.cancel(); + } + catch (SQLException ignored) { + // No-op. } } @@ -1143,6 +1170,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param keepCacheObj Flag to keep cache object. * @param enforceJoinOrder Enforce join order of tables. * @param parts Partitions. + * @param lazy Lazy query execution flag. * @return Iterable result. */ private Iterable<List<?>> runQueryTwoStep( @@ -1153,12 +1181,13 @@ public class IgniteH2Indexing implements GridQueryIndexing { final int timeoutMillis, final GridQueryCancel cancel, final Object[] params, - final int[] parts + final int[] parts, + final boolean lazy ) { return new Iterable<List<?>>() { @Override public Iterator<List<?>> iterator() { return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params, - parts); + parts, lazy); } }; } @@ -1402,7 +1431,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel, - qry.getArgs(), partitions), cancel); + qry.getArgs(), partitions, qry.isLazy()), cancel); cursor.fieldsMeta(meta); @@ -2070,6 +2099,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Stopping cache query index..."); + mapQryExec.cancelLazyWorkers(); + // unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139 if (ctx != null && !ctx.cache().context().database().persistenceEnabled()) { for (H2Schema schema : schemas.values()) @@ -2355,6 +2386,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public void cancelAllQueries() { + mapQryExec.cancelLazyWorkers(); + for (Connection conn : conns) U.close(conn, log); } http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ca978e2..0cc4172 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -28,7 +28,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -52,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; -import org.apache.ignite.internal.processors.cache.query.QueryTable; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; @@ -69,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.thread.IgniteThread; import org.h2.jdbc.JdbcResultSet; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; @@ -107,6 +109,15 @@ public class GridMapQueryExecutor { /** */ private final ConcurrentMap<MapReservationKey, GridReservable> reservations = new ConcurrentHashMap8<>(); + /** Lazy workers. */ + private final ConcurrentHashMap<MapQueryLazyWorkerKey, MapQueryLazyWorker> lazyWorkers = new ConcurrentHashMap<>(); + + /** Busy lock for lazy workers. */ + private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock(); + + /** Lazy worker stop guard. */ + private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean(); + /** * @param busyLock Busy lock. */ @@ -162,6 +173,21 @@ public class GridMapQueryExecutor { } /** + * Cancel active lazy queries and prevent submit of new queries. + */ + public void cancelLazyWorkers() { + if (!lazyWorkerStopGuard.compareAndSet(false, true)) + return; + + lazyWorkerBusyLock.block(); + + for (MapQueryLazyWorker worker : lazyWorkers.values()) + worker.stop(); + + lazyWorkers.clear(); + } + + /** * @param nodeId Node ID. * @param msg Message. */ @@ -221,7 +247,7 @@ public class GridMapQueryExecutor { MapNodeResults nodeRess = qryRess.get(nodeId); if (nodeRess == null) { - nodeRess = new MapNodeResults(); + nodeRess = new MapNodeResults(nodeId); MapNodeResults old = qryRess.putIfAbsent(nodeId, nodeRess); @@ -416,6 +442,7 @@ public class GridMapQueryExecutor { final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER); final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN); final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED); + final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY); final List<Integer> cacheIds = req.caches(); @@ -429,30 +456,51 @@ public class GridMapQueryExecutor { final int segment = i; - ctx.closure().callLocal( - new Callable<Void>() { - @Override public Void call() throws Exception { - onQueryRequest0(node, - req.requestId(), - segment, - req.schemaName(), - req.queries(), - cacheIds, - req.topologyVersion(), - partsMap, - parts, - req.tables(), - req.pageSize(), - joinMode, - enforceJoinOrder, - false, - req.timeout(), - params); - - return null; + if (lazy) { + onQueryRequest0(node, + req.requestId(), + segment, + req.schemaName(), + req.queries(), + cacheIds, + req.topologyVersion(), + partsMap, + parts, + req.pageSize(), + joinMode, + enforceJoinOrder, + false, // Replicated is always false here (see condition above). + req.timeout(), + params, + true); // Lazy = true. + } + else { + ctx.closure().callLocal( + new Callable<Void>() { + @Override + public Void call() throws Exception { + onQueryRequest0(node, + req.requestId(), + segment, + req.schemaName(), + req.queries(), + cacheIds, + req.topologyVersion(), + partsMap, + parts, + req.pageSize(), + joinMode, + enforceJoinOrder, + false, + req.timeout(), + params, + false); // Lazy = false. + + return null; + } } - } - , QUERY_POOL); + , QUERY_POOL); + } } onQueryRequest0(node, @@ -464,13 +512,13 @@ public class GridMapQueryExecutor { req.topologyVersion(), partsMap, parts, - req.tables(), req.pageSize(), joinMode, enforceJoinOrder, replicated, req.timeout(), - params); + params, + lazy); } /** @@ -483,28 +531,61 @@ public class GridMapQueryExecutor { * @param topVer Topology version. * @param partsMap Partitions map for unstable topology. * @param parts Explicit partitions for current node. - * @param tbls Tables. * @param pageSize Page size. * @param distributedJoinMode Query distributed join mode. + * @param lazy Streaming flag. */ private void onQueryRequest0( - ClusterNode node, - long reqId, - int segmentId, - String schemaName, - Collection<GridCacheSqlQuery> qrys, - List<Integer> cacheIds, - AffinityTopologyVersion topVer, - Map<UUID, int[]> partsMap, - int[] parts, - Collection<QueryTable> tbls, - int pageSize, - DistributedJoinMode distributedJoinMode, - boolean enforceJoinOrder, - boolean replicated, - int timeout, - Object[] params + final ClusterNode node, + final long reqId, + final int segmentId, + final String schemaName, + final Collection<GridCacheSqlQuery> qrys, + final List<Integer> cacheIds, + final AffinityTopologyVersion topVer, + final Map<UUID, int[]> partsMap, + final int[] parts, + final int pageSize, + final DistributedJoinMode distributedJoinMode, + final boolean enforceJoinOrder, + final boolean replicated, + final int timeout, + final Object[] params, + boolean lazy ) { + if (lazy && MapQueryLazyWorker.currentWorker() == null) { + // Lazy queries must be re-submitted to dedicated workers. + MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId); + MapQueryLazyWorker worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this); + + worker.submit(new Runnable() { + @Override public void run() { + onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts, + pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true); + } + }); + + if (lazyWorkerBusyLock.enterBusy()) { + try { + MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker); + + if (oldWorker != null) + oldWorker.stop(); + + IgniteThread thread = new IgniteThread(worker); + + thread.start(); + } + finally { + lazyWorkerBusyLock.leaveBusy(); + } + } + else + log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']'); + + return; + } + // Prepare to run queries. GridCacheContext<?, ?> mainCctx = !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null; @@ -519,13 +600,18 @@ public class GridMapQueryExecutor { if (topVer != null) { // Reserve primary for topology version or explicit partitions. if (!reservePartitions(cacheIds, topVer, parts, reserved)) { + // Unregister lazy worker because re-try may never reach this node again. + if (lazy) + stopAndUnregisterCurrentLazyWorker(); + sendRetry(node, reqId, segmentId); return; } } - qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null); + qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null, + MapQueryLazyWorker.currentWorker()); if (nodeRess.put(reqId, segmentId, qr) != null) throw new IllegalStateException(); @@ -570,8 +656,7 @@ public class GridMapQueryExecutor { ResultSet rs = null; // If we are not the target node for this replicated query, just ignore it. - if (qry.node() == null || - (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { + if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { rs = h2.executeSqlQueryWithTimer(conn, qry.query(), F.asList(qry.parameters(params)), true, timeout, @@ -624,6 +709,10 @@ public class GridMapQueryExecutor { qr.cancel(false); } + // Unregister worker after possible cancellation. + if (lazy) + stopAndUnregisterCurrentLazyWorker(); + if (X.hasCause(e, GridH2RetryException.class)) sendRetry(node, reqId, segmentId); else { @@ -672,27 +761,39 @@ public class GridMapQueryExecutor { * @param node Node. * @param req Request. */ - private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) { - MapNodeResults nodeRess = qryRess.get(node.id()); + private void onNextPageRequest(final ClusterNode node, final GridQueryNextPageRequest req) { + final MapNodeResults nodeRess = qryRess.get(node.id()); if (nodeRess == null) { sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req)); return; - } else if (nodeRess.cancelled(req.queryRequestId())) { + } + else if (nodeRess.cancelled(req.queryRequestId())) { sendError(node, req.queryRequestId(), new QueryCancelledException()); return; } - MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); + final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); if (qr == null) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); else if (qr.cancelled()) sendError(node, req.queryRequestId(), new QueryCancelledException()); - else - sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize()); + else { + MapQueryLazyWorker lazyWorker = qr.lazyWorker(); + + if (lazyWorker != null) { + lazyWorker.submit(new Runnable() { + @Override public void run() { + sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize()); + } + }); + } + else + sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize()); + } } /** @@ -784,4 +885,34 @@ public class GridMapQueryExecutor { reservations.remove(grpKey); } } + + /** + * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread). + */ + public void stopAndUnregisterCurrentLazyWorker() { + MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); + + if (worker != null) { + worker.stop(); + + // Just stop is not enough as worker may be registered, but not started due to exception. + unregisterLazyWorker(worker); + } + } + + /** + * Unregister lazy worker. + * + * @param worker Worker. + */ + public void unregisterLazyWorker(MapQueryLazyWorker worker) { + lazyWorkers.remove(worker.key(), worker); + } + + /** + * @return Number of registered lazy workers. + */ + public int registeredLazyWorkers() { + return lazyWorkers.size(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 0e9d1a2..8638794 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -506,6 +506,7 @@ public class GridReduceQueryExecutor { * @param cancel Query cancel. * @param params Query parameters. * @param parts Partitions. + * @param lazy Lazy execution flag. * @return Rows iterator. */ public Iterator<List<?>> query( @@ -516,7 +517,8 @@ public class GridReduceQueryExecutor { int timeoutMillis, GridQueryCancel cancel, Object[] params, - final int[] parts + final int[] parts, + boolean lazy ) { if (F.isEmpty(params)) params = EMPTY_PARAMS; @@ -712,6 +714,9 @@ public class GridReduceQueryExecutor { if (isReplicatedOnly) flags |= GridH2QueryRequest.FLAG_REPLICATED; + if (lazy && mapQrys.size() == 1) + flags |= GridH2QueryRequest.FLAG_LAZY; + GridH2QueryRequest req = new GridH2QueryRequest() .requestId(qryReqId) .topologyVersion(topVer) http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java index d5ea357..2d20c8d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.jsr166.ConcurrentHashMap8; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; @@ -35,6 +36,18 @@ class MapNodeResults { private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); + /** Node ID. */ + private final UUID nodeId; + + /** + * Constructor. + * + * @param nodeId Node ID. + */ + public MapNodeResults(UUID nodeId) { + this.nodeId = nodeId; + } + /** * @param reqId Query Request ID. * @return {@code False} if query was already cancelled. @@ -59,7 +72,7 @@ class MapNodeResults { * @return query partial results. */ public MapQueryResults get(long reqId, int segmentId) { - return res.get(new MapRequestKey(reqId, segmentId)); + return res.get(new MapRequestKey(nodeId, reqId, segmentId)); } /** @@ -84,7 +97,7 @@ class MapNodeResults { * @return {@code True} if removed. */ public boolean remove(long reqId, int segmentId, MapQueryResults qr) { - return res.remove(new MapRequestKey(reqId, segmentId), qr); + return res.remove(new MapRequestKey(nodeId, reqId, segmentId), qr); } /** @@ -94,7 +107,7 @@ class MapNodeResults { * @return previous value. */ public MapQueryResults put(long reqId, int segmentId, MapQueryResults qr) { - return res.put(new MapRequestKey(reqId, segmentId), qr); + return res.put(new MapRequestKey(nodeId, reqId, segmentId), qr); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java new file mode 100644 index 0000000..5158035 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.jetbrains.annotations.Nullable; +import org.jsr166.LongAdder8; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Worker for lazy query execution. + */ +public class MapQueryLazyWorker extends GridWorker { + /** Lazy thread flag. */ + private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>(); + + /** Active lazy worker count (for testing purposes). */ + private static final LongAdder8 ACTIVE_CNT = new LongAdder8(); + + /** Task to be executed. */ + private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>(); + + /** Key. */ + private final MapQueryLazyWorkerKey key; + + /** Map query executor. */ + private final GridMapQueryExecutor exec; + + /** Latch decremented when worker finishes. */ + private final CountDownLatch stopLatch = new CountDownLatch(1); + + /** Map query result. */ + private volatile MapQueryResult res; + + /** + * Constructor. + * + * @param instanceName Instance name. + * @param key Lazy worker key. + * @param log Logger. + * @param exec Map query executor. + */ + public MapQueryLazyWorker(@Nullable String instanceName, MapQueryLazyWorkerKey key, IgniteLogger log, + GridMapQueryExecutor exec) { + super(instanceName, workerName(instanceName, key), log); + + this.key = key; + this.exec = exec; + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + LAZY_WORKER.set(this); + + ACTIVE_CNT.increment(); + + try { + while (!isCancelled()) { + Runnable task = tasks.take(); + + if (task != null) + task.run(); + } + } + finally { + if (res != null) + res.close(); + + LAZY_WORKER.set(null); + + ACTIVE_CNT.decrement(); + + exec.unregisterLazyWorker(this); + } + } + + /** + * Submit task to worker. + * + * @param task Task to be executed. + */ + public void submit(Runnable task) { + tasks.add(task); + } + + /** + * @return Worker key. + */ + public MapQueryLazyWorkerKey key() { + return key; + } + + /** + * Stop the worker. + */ + public void stop() { + if (MapQueryLazyWorker.currentWorker() == null) + submit(new Runnable() { + @Override public void run() { + stop(); + } + }); + else { + isCancelled = true; + + stopLatch.countDown(); + } + } + + /** + * Await worker stop. + */ + public void awaitStop() { + try { + U.await(stopLatch); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e); + } + } + + /** + * @param res Map query result. + */ + public void result(MapQueryResult res) { + this.res = res; + } + + /** + * @return Current worker or {@code null} if call is performed not from lazy worker thread. + */ + @Nullable public static MapQueryLazyWorker currentWorker() { + return LAZY_WORKER.get(); + } + + /** + * @return Active workers count. + */ + public static int activeCount() { + return ACTIVE_CNT.intValue(); + } + + /** + * Construct worker name. + * + * @param instanceName Instance name. + * @param key Key. + * @return Name. + */ + private static String workerName(String instanceName, MapQueryLazyWorkerKey key) { + return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" + + key.segment(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java new file mode 100644 index 0000000..a0f5ebb --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * Key to identify lazy worker. + */ +public class MapQueryLazyWorkerKey { + /** Client node ID. */ + private final UUID nodeId; + + /** Query request ID. */ + private final long qryReqId; + + /** Segment. */ + private final int segment; + + /** + * Constructor. + * + * @param nodeId Node ID. + * @param qryReqId Query request ID. + * @param segment Segment. + */ + public MapQueryLazyWorkerKey(UUID nodeId, long qryReqId, int segment) { + this.nodeId = nodeId; + this.qryReqId = qryReqId; + this.segment = segment; + } + + /** + * @return Node id. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** + * @return Segment. + */ + public int segment() { + return segment; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + + res = 31 * res + (int)(qryReqId ^ (qryReqId >>> 32)); + res = 31 * res + segment; + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj != null && obj instanceof MapQueryLazyWorkerKey) { + MapQueryLazyWorkerKey other = (MapQueryLazyWorkerKey)obj; + + return F.eq(qryReqId, other.qryReqId) && F.eq(nodeId, other.nodeId) && F.eq(segment, other.segment); + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MapQueryLazyWorkerKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index 4799e03..e54c784d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.jdbc.JdbcResultSet; +import org.h2.result.LazyResult; import org.h2.result.ResultInterface; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; @@ -41,7 +42,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; /** * Mapper result for a single part of the query. */ -class MapQueryResult implements AutoCloseable { +class MapQueryResult { /** */ private static final Field RESULT_FIELD; @@ -95,24 +96,30 @@ class MapQueryResult implements AutoCloseable { /** */ private final Object[] params; + /** Lazy worker. */ + private final MapQueryLazyWorker lazyWorker; + /** * @param rs Result set. * @param cacheName Cache name. * @param qrySrcNodeId Query source node. * @param qry Query. * @param params Query params. + * @param lazyWorker Lazy worker. */ MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName, - UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) { + UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) { this.h2 = h2; this.cacheName = cacheName; this.qry = qry; this.params = params; this.qrySrcNodeId = qrySrcNodeId; this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId); + this.lazyWorker = lazyWorker; if (rs != null) { this.rs = rs; + try { res = (ResultInterface)RESULT_FIELD.get(rs); } @@ -120,7 +127,7 @@ class MapQueryResult implements AutoCloseable { throw new IllegalStateException(e); // Must not happen. } - rowCnt = res.getRowCount(); + rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount(); cols = res.getVisibleColumnCount(); } else { @@ -167,6 +174,8 @@ class MapQueryResult implements AutoCloseable { * @return {@code true} If there are no more rows available. */ synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) { + assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker(); + if (closed) return true; @@ -246,13 +255,34 @@ class MapQueryResult implements AutoCloseable { return res; } - /** {@inheritDoc} */ - @Override public synchronized void close() { - if (closed) + /** + * Close the result. + */ + public void close() { + if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) { + lazyWorker.submit(new Runnable() { + @Override public void run() { + close(); + } + }); + + lazyWorker.awaitStop(); + return; + } - closed = true; + synchronized (this) { + assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker(); - U.closeQuiet(rs); + if (closed) + return; + + closed = true; + + U.closeQuiet(rs); + + if (lazyWorker != null) + lazyWorker.stop(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java index 7ad1d14..99f1966 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java @@ -45,20 +45,27 @@ class MapQueryResults { /** */ private final String cacheName; + /** Lazy worker. */ + private final MapQueryLazyWorker lazyWorker; + /** */ private volatile boolean cancelled; /** + * Constructor. + * * @param qryReqId Query request ID. * @param qrys Number of queries. * @param cacheName Cache name. + * @param lazyWorker Lazy worker (if any). */ @SuppressWarnings("unchecked") - MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, - @Nullable String cacheName) { + MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable String cacheName, + @Nullable MapQueryLazyWorker lazyWorker) { this.h2 = h2; this.qryReqId = qryReqId; this.cacheName = cacheName; + this.lazyWorker = lazyWorker; results = new AtomicReferenceArray<>(qrys); cancels = new GridQueryCancel[qrys]; @@ -86,13 +93,25 @@ class MapQueryResults { } /** + * @return Lazy worker. + */ + MapQueryLazyWorker lazyWorker() { + return lazyWorker; + } + + /** + * Add result. + * * @param qry Query result index. * @param q Query object. * @param qrySrcNodeId Query source node. * @param rs Result set. */ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { - MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params); + MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker); + + if (lazyWorker != null) + lazyWorker.result(res); if (!results.compareAndSet(qry, null, res)) throw new IllegalStateException(); @@ -130,6 +149,7 @@ class MapQueryResults { continue; } + // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable). if (forceQryCancel) { GridQueryCancel cancel = cancels[i]; http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java index 6feb8ea..9d987db 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java @@ -17,18 +17,32 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import org.apache.ignite.internal.util.typedef.F; + +import java.util.UUID; + /** * Mapper request key. */ class MapRequestKey { + /** Node ID. */ + private UUID nodeId; + /** */ private long reqId; /** */ private int segmentId; - /** Constructor */ - MapRequestKey(long reqId, int segmentId) { + /** + * Constructor. + * + * @param nodeId Node ID. + * @param reqId Request ID. + * @param segmentId Segment ID. + */ + MapRequestKey(UUID nodeId, long reqId, int segmentId) { + this.nodeId = nodeId; this.reqId = reqId; this.segmentId = segmentId; } @@ -50,14 +64,15 @@ class MapRequestKey { MapRequestKey other = (MapRequestKey)o; - return reqId == other.reqId && segmentId == other.segmentId; + return F.eq(nodeId, other.nodeId) && reqId == other.reqId && segmentId == other.segmentId; } /** {@inheritDoc} */ @Override public int hashCode() { - int res = (int)(reqId ^ (reqId >>> 32)); + int res = nodeId != null ? nodeId.hashCode() : 0; + res = 31 * res + (int)(reqId ^ (reqId >>> 32)); res = 31 * res + segmentId; return res; http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index 93a383c..4e1fadb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -78,6 +78,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { */ public static final int FLAG_REPLICATED = 1 << 4; + /** + * If lazy execution is enabled. + */ + public static final int FLAG_LAZY = 1 << 5; + /** */ private long reqId; @@ -185,6 +190,10 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { } /** + * Get tables. + * <p> + * N.B.: Was used in AI 1.9 for snapshots. Unused at the moment, but should be kept for compatibility reasons. + * * @return Tables. */ public Collection<QueryTable> tables() { http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java new file mode 100644 index 0000000..d5cc0eb --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Tests for lazy query execution. + */ +public class LazyQuerySelfTest extends GridCommonAbstractTest { + /** Keys ocunt. */ + private static final int KEY_CNT = 200; + + /** Base query argument. */ + private static final int BASE_QRY_ARG = 50; + + /** Size for small pages. */ + private static final int PAGE_SIZE_SMALL = 12; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Test local query execution. + * + * @throws Exception If failed. + */ + public void testSingleNode() throws Exception { + checkSingleNode(1); + } + + /** + * Test local query execution. + * + * @throws Exception If failed. + */ + public void testSingleNodeWithParallelism() throws Exception { + checkSingleNode(4); + } + + /** + * Test query execution with multiple topology nodes. + * + * @throws Exception If failed. + */ + public void testMultipleNodes() throws Exception { + checkMultipleNodes(1); + } + + /** + * Test query execution with multiple topology nodes with query parallelism. + * + * @throws Exception If failed. + */ + public void testMultipleNodesWithParallelism() throws Exception { + checkMultipleNodes(4); + } + + /** + * Check local query execution. + * + * @param parallelism Query parallelism. + * @throws Exception If failed. + */ + public void checkSingleNode(int parallelism) throws Exception { + Ignite srv = startGrid(); + + srv.createCache(cacheConfiguration(parallelism)); + + populateBaseQueryData(srv); + + checkBaseOperations(srv); + } + + /** + * Check query execution with multiple topology nodes. + * + * @param parallelism Query parallelism. + * @throws Exception If failed. + */ + public void checkMultipleNodes(int parallelism) throws Exception { + Ignite srv1 = startGrid(1); + Ignite srv2 = startGrid(2); + + Ignite cli; + + try { + Ignition.setClientMode(true); + + cli = startGrid(3); + } + finally { + Ignition.setClientMode(false); + } + + cli.createCache(cacheConfiguration(parallelism)); + + populateBaseQueryData(cli); + + checkBaseOperations(srv1); + checkBaseOperations(srv2); + checkBaseOperations(cli); + + // Test originating node leave. + FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL)); + + Iterator<List<?>> iter = cursor.iterator(); + + for (int i = 0; i < 30; i++) + iter.next(); + + stopGrid(3); + + assertNoWorkers(); + + // Test server node leave with active worker. + cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL)); + + try { + iter = cursor.iterator(); + + for (int i = 0; i < 30; i++) + iter.next(); + + stopGrid(2); + } + finally { + cursor.close(); + } + + assertNoWorkers(); + } + + /** + * Check base operations. + * + * @param node Node. + * @throws Exception If failed. + */ + private void checkBaseOperations(Ignite node) throws Exception { + // Get full data. + List<List<?>> rows = execute(node, baseQuery()).getAll(); + + assertBaseQueryResults(rows); + assertNoWorkers(); + + // Get data in several pages. + rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll(); + + assertBaseQueryResults(rows); + assertNoWorkers(); + + // Test full iteration. + rows = new ArrayList<>(); + + FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)); + + for (List<?> row : cursor) + rows.add(row); + + assertBaseQueryResults(rows); + assertNoWorkers(); + + // Test partial iteration with cursor close. + try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) { + Iterator<List<?>> iter = partialCursor.iterator(); + + for (int i = 0; i < 30; i++) + iter.next(); + } + + assertNoWorkers(); + + // Test execution of multiple queries at a time. + List<Iterator<List<?>>> iters = new ArrayList<>(); + + for (int i = 0; i < 200; i++) + iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator()); + + while (!iters.isEmpty()) { + Iterator<Iterator<List<?>>> iterIter = iters.iterator(); + + while (iterIter.hasNext()) { + Iterator<List<?>> iter = iterIter.next(); + + int i = 0; + + while (iter.hasNext() && i < 20) { + iter.next(); + + i++; + } + + if (!iter.hasNext()) + iterIter.remove(); + } + } + + assertNoWorkers(); + } + + /** + * Populate base query data. + * + * @param node Node. + */ + private static void populateBaseQueryData(Ignite node) { + IgniteCache<Long, Person> cache = cache(node); + + for (long i = 0; i < KEY_CNT; i++) + cache.put(i, new Person(i)); + } + + /** + * @return Query with randomized argument. + */ + private static SqlFieldsQuery randomizedQuery() { + return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2)); + } + + /** + * @return Base query. + */ + private static SqlFieldsQuery baseQuery() { + return query(BASE_QRY_ARG); + } + + /** + * @param parallelism Query parallelism. + * @return Default cache configuration. + */ + private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) { + return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class) + .setQueryParallelism(parallelism); + } + + /** + * Default query. + * + * @param arg Argument. + * @return Query. + */ + private static SqlFieldsQuery query(long arg) { + return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg); + } + + /** + * Assert base query results. + * + * @param rows Result rows. + */ + private static void assertBaseQueryResults(List<List<?>> rows) { + assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size()); + + for (List<?> row : rows) { + Long id = (Long)row.get(0); + String name = (String)row.get(1); + + assertTrue(id >= BASE_QRY_ARG); + assertEquals(nameForId(id), name); + } + } + + /** + * Get cache for node. + * + * @param node Node. + * @return Cache. + */ + private static IgniteCache<Long, Person> cache(Ignite node) { + return node.cache(CACHE_NAME); + } + + /** + * Execute query on the given cache. + * + * @param node Node. + * @param qry Query. + * @return Cursor. + */ + @SuppressWarnings("unchecked") + private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) { + return cache(node).query(qry.setLazy(true)); + } + + /** + * Make sure that are no active lazy workers. + * + * @throws Exception If failed. + */ + private static void assertNoWorkers() throws Exception { + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (Ignite node : Ignition.allGrids()) { + IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing(); + + if (idx.mapQueryExecutor().registeredLazyWorkers() != 0) + return false; + } + + return MapQueryLazyWorker.activeCount() == 0; + } + }, 1000L); + } + + /** + * Get name for ID. + * + * @param id ID. + * @return Name. + */ + private static String nameForId(long id) { + return "name-" + id; + } + + /** + * Person class. + */ + private static class Person { + /** ID. */ + @QuerySqlField(index = true) + private long id; + + /** Name. */ + @QuerySqlField + private String name; + + /** + * Constructor. + * + * @param id ID. + */ + public Person(long id) { + this.id = id; + this.name = nameForId(id); + } + + /** + * @return ID. + */ + public long id() { + return id; + } + + /** + * @return Name. + */ + public String name() { + return name; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 99b0370..5ac0655f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -127,6 +127,7 @@ import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest; +import org.apache.ignite.internal.processors.query.LazyQuerySelfTest; import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest; import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest; @@ -184,6 +185,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IncorrectQueryEntityTest.class); // Queries tests. + suite.addTestSuite(LazyQuerySelfTest.class); suite.addTestSuite(IgniteSqlSplitterSelfTest.class); suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class); suite.addTestSuite(IgniteSqlSegmentedIndexMultiNodeSelfTest.class);
