http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index f228111..9b7d268 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -40,7 +40,6 @@ import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -58,12 +57,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.CompoundLockFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation; import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -71,8 +70,10 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable; import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture; import org.apache.ignite.internal.processors.query.h2.UpdateResult; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; @@ -97,13 +98,13 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.indexing.IndexingQueryFilter; -import org.apache.ignite.thread.IgniteThread; +import org.h2.api.ErrorCode; import org.h2.command.Prepared; import org.h2.jdbc.JdbcResultSet; +import org.h2.jdbc.JdbcSQLException; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET; import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE; import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; @@ -123,9 +124,6 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V @SuppressWarnings("ForLoopReplaceableByForEach") public class GridMapQueryExecutor { /** */ - public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET); - - /** */ private IgniteLogger log; /** */ @@ -149,8 +147,8 @@ public class GridMapQueryExecutor { /** Busy lock for lazy workers. */ private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock(); - /** Lazy worker stop guard. */ - private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean(); + /** Stop guard. */ + private final AtomicBoolean stopGuard = new AtomicBoolean(); /** * @param busyLock Busy lock. @@ -207,18 +205,21 @@ public class GridMapQueryExecutor { } /** - * Cancel active lazy queries and prevent submit of new queries. + * Stop query map executor, cleanup resources. */ - public void cancelLazyWorkers() { - if (!lazyWorkerStopGuard.compareAndSet(false, true)) + public void stop() { + if (!stopGuard.compareAndSet(false, true)) return; - lazyWorkerBusyLock.block(); + for (MapNodeResults res : qryRess.values()) + res.cancelAll(); - for (MapQueryLazyWorker worker : lazyWorkers.values()) - worker.stop(false); + for (MapQueryLazyWorker w : lazyWorkers.values()) + w.stop(true); - lazyWorkers.clear(); + lazyWorkerBusyLock.block(); + + assert lazyWorkers.isEmpty() : "Not cleaned lazy workers: " + lazyWorkers.size(); } /** @@ -259,7 +260,7 @@ public class GridMapQueryExecutor { * @return Busy lock for lazy workers to guard their operations with. */ GridSpinBusyLock busyLock() { - return busyLock; + return lazyWorkerBusyLock; } /** @@ -554,6 +555,7 @@ public class GridMapQueryExecutor { /** * @param node Node. * @param req Query request. + * @throws IgniteCheckedException On error. */ private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException { int[] qryParts = req.queryPartitions(); @@ -566,10 +568,14 @@ public class GridMapQueryExecutor { req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL), req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS)); + final GridDhtTxLocalAdapter tx; + + GridH2SelectForUpdateTxDetails txReq = req.txDetails(); + final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER); final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN); final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED); - final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY); + final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY) && txReq == null; final List<Integer> cacheIds = req.caches(); @@ -578,10 +584,6 @@ public class GridMapQueryExecutor { final Object[] params = req.parameters(); - final GridDhtTxLocalAdapter tx; - - GridH2SelectForUpdateTxDetails txReq = req.txDetails(); - try { if (txReq != null) { // Prepare to run queries. @@ -736,7 +738,11 @@ public class GridMapQueryExecutor { * @param parts Explicit partitions for current node. * @param pageSize Page size. * @param distributedJoinMode Query distributed join mode. - * @param lazy Streaming flag. + * @param enforceJoinOrder Enforce join order flag. + * @param replicated Replicated flag. + * @param timeout Query timeout. + * @param params Query params. + * @param lazy Lazy query execution flag. * @param mvccSnapshot MVCC snapshot. * @param tx Transaction. * @param txDetails TX details, if it's a {@code FOR UPDATE} request, or {@code null}. @@ -765,75 +771,24 @@ public class GridMapQueryExecutor { @Nullable final GridH2SelectForUpdateTxDetails txDetails, @Nullable final CompoundLockFuture lockFut, @Nullable final AtomicInteger runCntr) { - MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); - // In presence of TX, we also must always have matching details. assert tx == null || txDetails != null; - boolean inTx = (tx != null); - - if (lazy && worker == null) { - // Lazy queries must be re-submitted to dedicated workers. - MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId); - worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this); - - worker.submit(new Runnable() { - @Override public void run() { - onQueryRequest0( - node, - reqId, - segmentId, - schemaName, - qrys, - cacheIds, - topVer, - partsMap, - parts, - pageSize, - distributedJoinMode, - enforceJoinOrder, - replicated, - timeout, - params, - true, - mvccSnapshot, - tx, - txDetails, - lockFut, - runCntr); - } - }); - - if (lazyWorkerBusyLock.enterBusy()) { - try { - MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker); - - if (oldWorker != null) - oldWorker.stop(false); + assert !lazy || txDetails == null : "Lazy execution of SELECT FOR UPDATE queries is not supported."; - IgniteThread thread = new IgniteThread(worker); - - thread.start(); - } - finally { - lazyWorkerBusyLock.leaveBusy(); - } - } - else - log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']'); + boolean inTx = (tx != null); - return; - } + MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); - if (lazy && txDetails != null) - throw new IgniteSQLException("Lazy execution of SELECT FOR UPDATE queries is not supported."); + if (lazy && worker == null) + worker = createLazyWorker(node, reqId, segmentId); // Prepare to run queries. GridCacheContext<?, ?> mainCctx = mainCacheContext(cacheIds); MapNodeResults nodeRess = resultsForNode(node.id()); - MapQueryResults qr = null; + MapQueryResults qryResults = null; List<GridReservable> reserved = new ArrayList<>(); @@ -847,7 +802,7 @@ public class GridMapQueryExecutor { if (!F.isEmpty(err)) { // Unregister lazy worker because re-try may never reach this node again. if (lazy) - stopAndUnregisterCurrentLazyWorker(); + worker.stop(false); sendRetry(node, reqId, segmentId, err); @@ -855,10 +810,7 @@ public class GridMapQueryExecutor { } } - qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker(), inTx); - - if (nodeRess.put(reqId, segmentId, qr) != null) - throw new IllegalStateException(); + qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, worker, inTx); // Prepare query context. GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(), @@ -872,186 +824,207 @@ public class GridMapQueryExecutor { .pageSize(pageSize) .topologyVersion(topVer) .reservations(reserved) - .mvccSnapshot(mvccSnapshot) - .lazyWorker(worker); - - Connection conn = h2.connectionForSchema(schemaName); - - H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder); - - GridH2QueryContext.set(qctx); + .mvccSnapshot(mvccSnapshot); // qctx is set, we have to release reservations inside of it. reserved = null; - try { - if (nodeRess.cancelled(reqId)) { - GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); + if (worker != null) + worker.queryContext(qctx); - nodeRess.cancelRequest(reqId); + GridH2QueryContext.set(qctx); - throw new QueryCancelledException(); - } + if (nodeRess.put(reqId, segmentId, qryResults) != null) + throw new IllegalStateException(); - // Run queries. - int qryIdx = 0; + Connection conn = h2.connectionForSchema(schemaName); - boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED); + H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder, lazy); - for (GridCacheSqlQuery qry : qrys) { - ResultSet rs = null; + if (nodeRess.cancelled(reqId)) { + GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type()); - boolean removeMapping = false; + nodeRess.cancelRequest(reqId); - // If we are not the target node for this replicated query, just ignore it. - if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { - String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params)); + throw new QueryCancelledException(); + } - PreparedStatement stmt; + // Run queries. + int qryIdx = 0; - try { - stmt = h2.prepareStatement(conn, sql, true); - } - catch (SQLException e) { - throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e); - } + boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED); - Prepared p = GridSqlQueryParser.prepared(stmt); + for (GridCacheSqlQuery qry : qrys) { + ResultSet rs = null; - if (GridSqlQueryParser.isForUpdateQuery(p)) { - sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx); - stmt = h2.prepareStatement(conn, sql, true); - } + boolean removeMapping = false; - h2.bindParameters(stmt, params0); + // If we are not the target node for this replicated query, just ignore it. + if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { + String sql = qry.query(); Collection<Object> params0 = F.asList(qry.parameters(params)); - int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx); + PreparedStatement stmt; - rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx)); + try { + stmt = h2.prepareStatement(conn, sql, true); + } + catch (SQLException e) { + throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e); + } - if (inTx) { - ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future( - ctx.localNodeId(), - txDetails.version(), - mvccSnapshot, - txDetails.threadId(), - IgniteUuid.randomUuid(), - txDetails.miniId(), - parts, - tx, - opTimeout, - mainCctx, - rs - ); + Prepared p = GridSqlQueryParser.prepared(stmt); - if (lockFut != null) - lockFut.register(enlistFut); + if (GridSqlQueryParser.isForUpdateQuery(p)) { + sql = GridSqlQueryParser.rewriteQueryForUpdateIfNeeded(p, inTx); + stmt = h2.prepareStatement(conn, sql, true); + } - enlistFut.init(); + h2.bindParameters(stmt, params0); - enlistFut.get(); + int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx); - rs.beforeFirst(); - } + rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qryResults.queryCancel(qryIdx)); - if (evt) { - ctx.event().record(new CacheQueryExecutedEvent<>( - node, - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - mainCctx.name(), - null, - qry.query(), - null, - null, - params, - node.id(), - null)); - } + if (inTx) { + ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future( + ctx.localNodeId(), + txDetails.version(), + mvccSnapshot, + txDetails.threadId(), + IgniteUuid.randomUuid(), + txDetails.miniId(), + parts, + tx, + opTimeout, + mainCctx, + rs + ); + + if (lockFut != null) + lockFut.register(enlistFut); + + enlistFut.init(); + + enlistFut.get(); + + rs.beforeFirst(); + } - assert rs instanceof JdbcResultSet : rs.getClass(); + if (evt) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + mainCctx.name(), + null, + qry.query(), + null, + null, + params, + node.id(), + null)); } - qr.addResult(qryIdx, qry, node.id(), rs, params); + assert rs instanceof JdbcResultSet : rs.getClass(); + } - if (qr.cancelled()) { - qr.result(qryIdx).close(); + qryResults.addResult(qryIdx, qry, node.id(), rs, params); - throw new QueryCancelledException(); - } + if (qryResults.cancelled()) { + qryResults.result(qryIdx).close(); - if (inTx) { - if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) { - if (removeMapping = tx.empty() && !tx.queryEnlisted()) - tx.rollbackAsync().get(); - } + throw new QueryCancelledException(); + } + + if (inTx) { + if (tx.dht() && (runCntr == null || runCntr.decrementAndGet() == 0)) { + if (removeMapping = tx.empty() && !tx.queryEnlisted()) + tx.rollbackAsync().get(); } + } - // Send the first page. - if (lockFut == null) - sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping); - else { - GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping); - - if (msg != null) { - lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> future) { - try { - if (node.isLocal()) - h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); - else - ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); - } - catch (Exception e) { - U.error(log, e); - } + // Send the first page. + if (lockFut == null) + sendNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping); + else { + GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qryResults, qryIdx, segmentId, pageSize, removeMapping); + + if (msg != null) { + lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> future) { + try { + if (node.isLocal()) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + else + ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL); } - }); - } + catch (Exception e) { + U.error(log, e); + } + } + }); } - - qryIdx++; } - // All request results are in the memory in result set already, so it's ok to release partitions. - if (!lazy) - releaseReservations(); + qryIdx++; } - catch (Throwable e){ + + // All request results are in the memory in result set already, so it's ok to release partitions. + if (!lazy) releaseReservations(); + else if (!qryResults.isAllClosed()) { + if (MapQueryLazyWorker.currentWorker() == null) { + final ObjectPoolReusable<H2ConnectionWrapper> detachedConn = h2.detachConnection(); - throw e; + worker.start(H2Utils.session(conn), detachedConn); + + GridH2QueryContext.clearThreadLocal(); + } } + else + unregisterLazyWorker(worker); } catch (Throwable e) { - if (qr != null) { - nodeRess.remove(reqId, segmentId, qr); + if (qryResults != null) { + nodeRess.remove(reqId, segmentId, qryResults); - qr.cancel(false); + qryResults.close(); } + else + releaseReservations(); - // Unregister worker after possible cancellation. + // Stop and unregister worker after possible cancellation. if (lazy) - stopAndUnregisterCurrentLazyWorker(); + worker.stop(false); - GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); + if (e instanceof QueryCancelledException) + sendError(node, reqId, e); + else { + JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class); - if (retryErr != null) { - final String retryCause = String.format( - "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " + - "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() - ); + if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) + sendError(node, reqId, new QueryCancelledException()); + else { + GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class); - sendRetry(node, reqId, segmentId, retryCause); - } - else { - U.error(log, "Failed to execute local query.", e); + if (retryErr != null) { + final String retryCause = String.format( + "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " + + "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage() + ); - sendError(node, reqId, e); + sendRetry(node, reqId, segmentId, retryCause); + } + else { + U.error(log, "Failed to execute local query.", e); + + sendError(node, reqId, e); - if (e instanceof Error) - throw (Error)e; + if (e instanceof Error) + throw (Error)e; + } + } } } finally { @@ -1060,10 +1033,25 @@ public class GridMapQueryExecutor { for (int i = 0; i < reserved.size(); i++) reserved.get(i).release(); } + + if (MapQueryLazyWorker.currentWorker() == null && GridH2QueryContext.get() != null) + GridH2QueryContext.clearThreadLocal(); } } /** + * @param node The node has sent map query request. + * @param reqId Request ID. + * @param segmentId Segment ID. + * @return Lazy worker. + */ + private MapQueryLazyWorker createLazyWorker(ClusterNode node, long reqId, int segmentId) { + MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId); + + return new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this); + } + + /** * @param cacheIds Cache ids. * @return Id of the first cache in list, or {@code null} if list is empty. */ @@ -1088,6 +1076,7 @@ public class GridMapQueryExecutor { /** * @param node Node. * @param req DML request. + * @throws IgniteCheckedException On error. */ private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException { int[] parts = req.queryPartitions(); @@ -1255,24 +1244,34 @@ public class GridMapQueryExecutor { return; } - final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); + final MapQueryResults qryResults = nodeRess.get(req.queryRequestId(), req.segmentId()); - if (qr == null) + if (qryResults == null) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); - else if (qr.cancelled()) + else if (qryResults.cancelled()) sendError(node, req.queryRequestId(), new QueryCancelledException()); else { - MapQueryLazyWorker lazyWorker = qr.lazyWorker(); + MapQueryLazyWorker lazyWorker = qryResults.lazyWorker(); if (lazyWorker != null) { lazyWorker.submit(new Runnable() { @Override public void run() { - sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false); + try { + sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false); + } + catch (Throwable e) { + JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class); + + if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED) + sendError(node, qryResults.queryRequestId(), new QueryCancelledException()); + else + throw e; + } } }); } else - sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false); + sendNextPage(nodeRess, node, qryResults, req.query(), req.segmentId(), req.pageSize(), false); } } @@ -1287,8 +1286,14 @@ public class GridMapQueryExecutor { * @return Next page. * @throws IgniteCheckedException If failed. */ - private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId, - int pageSize, boolean removeMapping) throws IgniteCheckedException { + private GridQueryNextPageResponse prepareNextPage( + MapNodeResults nodeRess, + ClusterNode node, + MapQueryResults qr, + int qry, + int segmentId, + int pageSize, + boolean removeMapping) throws IgniteCheckedException { MapQueryResult res = qr.result(qry); assert res != null; @@ -1309,8 +1314,11 @@ public class GridMapQueryExecutor { nodeRess.remove(qr.queryRequestId(), segmentId, qr); // Release reservations if the last page fetched, all requests are closed and this is a lazy worker. - if (MapQueryLazyWorker.currentWorker() != null) + if (qr.lazyWorker() != null) { releaseReservations(); + + qr.lazyWorker().stop(false); + } } } @@ -1342,8 +1350,14 @@ public class GridMapQueryExecutor { * @param removeMapping Remove mapping flag. */ @SuppressWarnings("unchecked") - private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId, - int pageSize, boolean removeMapping) { + private void sendNextPage( + MapNodeResults nodeRess, + ClusterNode node, + MapQueryResults qr, + int qry, + int segmentId, + int pageSize, + boolean removeMapping) { try { GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping); @@ -1365,6 +1379,7 @@ public class GridMapQueryExecutor { * @param node Node. * @param reqId Request ID. * @param segmentId Index segment ID. + * @param retryCause Description of the retry cause. */ private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) { try { @@ -1401,25 +1416,11 @@ public class GridMapQueryExecutor { } /** - * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread). - */ - public void stopAndUnregisterCurrentLazyWorker() { - MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker(); - - if (worker != null) { - worker.stop(false); - - // Just stop is not enough as worker may be registered, but not started due to exception. - unregisterLazyWorker(worker); - } - } - - /** * Unregister lazy worker. * * @param worker Worker. */ - public void unregisterLazyWorker(MapQueryLazyWorker worker) { + void unregisterLazyWorker(MapQueryLazyWorker worker) { lazyWorkers.remove(worker.key(), worker); } @@ -1429,4 +1430,17 @@ public class GridMapQueryExecutor { public int registeredLazyWorkers() { return lazyWorkers.size(); } + + /** + * @param worker Worker to register. + */ + void registerLazyWorker(MapQueryLazyWorker worker) { + MapQueryLazyWorker oldWorker = lazyWorkers.put(worker.key(), worker); + + if (oldWorker != null) { + log.warning("Duplicates lazy worker: [key=" + worker.key() + ']'); + + oldWorker.stop(false); + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index 0cb986b..217cfad 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -17,12 +17,13 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; +import java.util.RandomAccess; import java.util.UUID; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -72,9 +73,11 @@ public class GridResultPage { Collection<?> plainRows = res.plainRows(); if (plainRows != null) { + assert plainRows instanceof RandomAccess : "instance of " + plainRows.getClass(); + rowsInPage = plainRows.size(); - if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns()) + if (rowsInPage == 0 || ((List<Value[]>)plainRows).get(0).length == res.columns()) rows = (Iterator<Value[]>)plainRows.iterator(); else { // If it's a result of SELECT FOR UPDATE (we can tell by difference in number http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java index 48116d3..8f8553a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java @@ -17,12 +17,11 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import org.apache.ignite.internal.processors.query.GridQueryCancel; -import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; -import java.util.concurrent.ConcurrentHashMap; - import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.internal.processors.query.GridQueryCancel; +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; @@ -86,10 +85,10 @@ class MapNodeResults { public void cancelRequest(long reqId) { for (MapRequestKey key : res.keySet()) { if (key.requestId() == reqId) { - MapQueryResults removed = res.remove(key); + final MapQueryResults removed = res.remove(key); if (removed != null) - removed.cancel(true); + removed.cancel(); } } @@ -144,7 +143,7 @@ class MapNodeResults { */ public void cancelAll() { for (MapQueryResults ress : res.values()) - ress.cancel(true); + ress.cancel(); // Cancel update requests for (GridQueryCancel upd: updCancels.values()) http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java index 98f3df9..1cbab19 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java @@ -20,25 +20,41 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper; +import org.apache.ignite.internal.processors.query.h2.H2Utils; +import org.apache.ignite.internal.processors.query.h2.ObjectPoolReusable; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; +import org.h2.engine.Session; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; + /** * Worker for lazy query execution. */ public class MapQueryLazyWorker extends GridWorker { + /** Poll task timeout milliseconds. */ + private static final int POLL_TASK_TIMEOUT_MS = 1000; + /** Lazy thread flag. */ private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>(); /** Active lazy worker count (for testing purposes). */ private static final LongAdder ACTIVE_CNT = new LongAdder(); + /** Mutex to synchronization worker start/stop. */ + private final Object mux = new Object(); + /** Task to be executed. */ private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>(); @@ -51,8 +67,14 @@ public class MapQueryLazyWorker extends GridWorker { /** Latch decremented when worker finishes. */ private final CountDownLatch stopLatch = new CountDownLatch(1); - /** Map query result. */ - private volatile MapQueryResult res; + /** Query context. */ + private GridH2QueryContext qctx; + + /** Worker is started flag. */ + private boolean started; + + /** Detached connection. */ + private ObjectPoolReusable<H2ConnectionWrapper> detached; /** * Constructor. @@ -70,38 +92,106 @@ public class MapQueryLazyWorker extends GridWorker { this.exec = exec; } + /** + * Start lazy worker for half-processed query. + * In this case we have to detach H2 connection from current thread and use it for current query processing. + * Also tables locks must be transferred to lazy thread from QUERY_POOL thread pool. + * + * @param ses H2 Session. + * @param detached H2 connection detached from current thread. + * @throws QueryCancelledException In case query is canceled during the worker start. + */ + void start(Session ses, ObjectPoolReusable<H2ConnectionWrapper> detached) throws QueryCancelledException { + synchronized (mux) { + if (!exec.busyLock().enterBusy()) { + log.warning("Lazy worker isn't started. Node is stopped [key=" + key + ']'); + + return; + } + + try { + if (started) + return; + + if (isCancelled) { + if (detached != null) + detached.recycle(); + + throw new QueryCancelledException(); + } + + if (ses != null) + lazyTransferStart(ses); + + this.detached = detached; + + exec.registerLazyWorker(this); + + IgniteThread thread = new IgniteThread(this); + + started = true; + + thread.start(); + } + finally { + exec.busyLock().leaveBusy(); + } + } + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { LAZY_WORKER.set(this); ACTIVE_CNT.increment(); + boolean lockBusy = false; + try { + if (qctx != null) + GridH2QueryContext.set(qctx); + + if(detached != null) + lazyTransferFinish(H2Utils.session(detached.object().connection())); + while (!isCancelled()) { - Runnable task = tasks.take(); + Runnable task = tasks.poll(POLL_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (task != null) { - if (!exec.busyLock().enterBusy()) - return; - try { task.run(); } + catch (Throwable t) { + log.warning("Lazy task error", t); + } + } + else { + try { + lockBusy = false; + + if (!exec.busyLock().enterBusy()) { + log.info("Stop lazy worker [key=" + key + ']'); + + return; + } + + lockBusy = true; + } finally { - exec.busyLock().leaveBusy(); + if (lockBusy) + exec.busyLock().leaveBusy(); } } } } finally { - if (res != null) - res.close(); + exec.unregisterLazyWorker(this); LAZY_WORKER.set(null); ACTIVE_CNT.decrement(); - exec.unregisterLazyWorker(this); + stopLatch.countDown(); } } @@ -111,6 +201,9 @@ public class MapQueryLazyWorker extends GridWorker { * @param task Task to be executed. */ public void submit(Runnable task) { + if (isCancelled) + return; + tasks.add(task); } @@ -125,45 +218,76 @@ public class MapQueryLazyWorker extends GridWorker { * Stop the worker. * @param nodeStop Node is stopping. */ - public void stop(final boolean nodeStop) { - if (MapQueryLazyWorker.currentWorker() == null) - submit(new Runnable() { - @Override public void run() { - stop(nodeStop); - } - }); - else { - GridH2QueryContext qctx = GridH2QueryContext.get(); - - if (qctx != null) { + private void stop0(boolean nodeStop) { + synchronized (mux) { + if (qctx != null && qctx.distributedJoinMode() == OFF && !qctx.isCleared()) qctx.clearContext(nodeStop); - GridH2QueryContext.clearThreadLocal(); + if (detached != null) { + detached.recycle(); + + detached = null; } isCancelled = true; - stopLatch.countDown(); + mux.notifyAll(); } } /** - * Await worker stop. + * @param task Stop task. */ - public void awaitStop() { - try { - U.await(stopLatch); + public void submitStopTask(Runnable task) { + synchronized (mux) { + if (LAZY_WORKER.get() != null) + task.run(); + else + submit(task); } - catch (IgniteInterruptedCheckedException e) { - throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e); + } + + /** + * Stop the worker. + * @param nodeStop Node is stopping. + */ + public void stop(final boolean nodeStop) { + synchronized (mux) { + if (isCancelled) + return; + + if (started && currentWorker() == null) { + submit(new Runnable() { + @Override public void run() { + stop0(nodeStop); + } + }); + + awaitStop(); + } + else if (currentWorker() != null) + stop0(nodeStop); } } /** - * @param res Map query result. + * Await worker stop. */ - public void result(MapQueryResult res) { - this.res = res; + private void awaitStop() { + synchronized (mux) { + try { + if (!isCancelled) + mux.wait(); + + U.await(stopLatch); + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } /** @@ -181,6 +305,13 @@ public class MapQueryLazyWorker extends GridWorker { } /** + * @param qctx Query context. + */ + public void queryContext(GridH2QueryContext qctx) { + this.qctx = qctx; + } + + /** * Construct worker name. * * @param instanceName Instance name. @@ -191,4 +322,32 @@ public class MapQueryLazyWorker extends GridWorker { return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" + key.segment(); } + + /** + * Start session transfer to lazy thread. + * + * @param ses Session. + */ + private static void lazyTransferStart(Session ses) { + GridH2QueryContext qctx = GridH2QueryContext.get(); + + assert qctx != null; + + for(GridH2Table tbl : qctx.lockedTables()) + tbl.onLazyTransferStarted(ses); + } + + /** + * Finish session transfer to lazy thread. + * + * @param ses Session. + */ + private static void lazyTransferFinish(Session ses) { + GridH2QueryContext qctx = GridH2QueryContext.get(); + + assert qctx != null; + + for(GridH2Table tbl : qctx.lockedTables()) + tbl.onLazyTransferFinished(ses); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index fb928c4..5a0c410 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -22,6 +22,7 @@ import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -60,6 +61,9 @@ class MapQueryResult { } } + /** Logger. */ + private final IgniteLogger log; + /** Indexing. */ private final IgniteH2Indexing h2; @@ -96,26 +100,23 @@ class MapQueryResult { /** */ private final Object[] params; - /** Lazy worker. */ - private final MapQueryLazyWorker lazyWorker; - /** + * @param h2 H2 indexing. * @param rs Result set. * @param cctx Cache context. * @param qrySrcNodeId Query source node. * @param qry Query. * @param params Query params. - * @param lazyWorker Lazy worker. */ MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx, - UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) { + UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) { + this.log = h2.kernalContext().log(MapQueryResult.class); this.h2 = h2; this.cctx = cctx; this.qry = qry; this.params = params; this.qrySrcNodeId = qrySrcNodeId; this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId); - this.lazyWorker = lazyWorker; if (rs != null) { this.rs = rs; @@ -174,8 +175,6 @@ class MapQueryResult { * @return {@code true} If there are no more rows available. */ synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) { - assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker(); - if (closed) return true; @@ -259,30 +258,13 @@ class MapQueryResult { * Close the result. */ public void close() { - if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) { - lazyWorker.submit(new Runnable() { - @Override public void run() { - close(); - } - }); - - lazyWorker.awaitStop(); - - return; - } - synchronized (this) { - assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker(); - if (closed) return; closed = true; - U.closeQuiet(rs); - - if (lazyWorker != null) - lazyWorker.stop(false); + U.close(rs, log); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java index 76527bc..b13137c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java @@ -30,7 +30,7 @@ import org.jetbrains.annotations.Nullable; * Mapper query results. */ class MapQueryResults { - /** H@ indexing. */ + /** H2 indexing. */ private final IgniteH2Indexing h2; /** */ @@ -113,10 +113,7 @@ class MapQueryResults { * @param params Query arguments. */ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { - MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker); - - if (lazyWorker != null) - lazyWorker.result(res); + MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params); if (!results.compareAndSet(qry, null, res)) throw new IllegalStateException(); @@ -139,28 +136,37 @@ class MapQueryResults { /** * Cancels the query. */ - void cancel(boolean forceQryCancel) { + void cancel() { if (cancelled) return; cancelled = true; for (int i = 0; i < results.length(); i++) { - MapQueryResult res = results.get(i); + GridQueryCancel cancel = cancels[i]; - if (res != null) { - res.close(); + if (cancel != null) + cancel.cancel(); + } - continue; - } + if (lazyWorker == null) + close(); + else { + lazyWorker.submitStopTask(this::close); - // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable). - if (forceQryCancel) { - GridQueryCancel cancel = cancels[i]; + lazyWorker.stop(false); + } + } - if (cancel != null) - cancel.cancel(); - } + /** + * + */ + public void close() { + for (int i = 0; i < results.length(); i++) { + MapQueryResult res = results.get(i); + + if (res != null) + res.close(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java index a112969..a991530 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLazyQueryPartitionsReleaseTest.java @@ -96,7 +96,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT int partsFilled = fillAllPartitions(cache, aff); SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person") - .setLazy(true) .setPageSize(1); FieldsQueryCursor<List<?>> qryCursor = cache.query(qry); @@ -143,7 +142,6 @@ public class GridCacheLazyQueryPartitionsReleaseTest extends GridCommonAbstractT int partsFilled = fillAllPartitions(cache, aff); SqlFieldsQuery qry = new SqlFieldsQuery("select name, age from person") - .setLazy(true) .setPageSize(1); FieldsQueryCursor<List<?>> qryCursor = cache.query(qry); http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java index 59be138..24e2fb2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java @@ -121,12 +121,15 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest { private static int getStatementCacheSize(GridQueryProcessor qryProcessor) { IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx"); - ConcurrentMap<Thread, H2ConnectionWrapper> conns = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns"); + ConcurrentMap<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> conns = + GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "conns"); int cntr = 0; - for (H2ConnectionWrapper w : conns.values()) - cntr += w.statementCacheSize(); + for (ConcurrentMap<H2ConnectionWrapper, Boolean> connPerThread: conns.values()) { + for (H2ConnectionWrapper w : connPerThread.keySet()) + cntr += w.statementCacheSize(); + } return cntr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java index 56fd7b8..8542f43 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java @@ -177,7 +177,6 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit, boolean timeout) throws Exception { try (Ignite client = startGrid("client")) { - IgniteCache<Object, Object> cache = client.cache(DEFAULT_CACHE_NAME); assertEquals(0, cache.localSize()); @@ -204,7 +203,8 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr qry.setTimeout(timeoutUnits, timeUnit); cursor = cache.query(qry); - } else { + } + else { cursor = cache.query(qry); client.scheduler().runLocal(new Runnable() { @@ -214,7 +214,7 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr }, timeoutUnits, timeUnit); } - try(QueryCursor<List<?>> ignored = cursor) { + try (QueryCursor<List<?>> ignored = cursor) { cursor.iterator(); } catch (CacheException ex) { http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java index bad5303..3beebff 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import org.apache.ignite.testframework.GridTestUtils; /** * Test for distributed queries with node restarts. @@ -101,11 +102,11 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa assertEquals(broadcastQry, plan.contains("batched:broadcast")); - final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll(); + final List<List<?>> goldenRes = grid(0).cache("pu").query(qry0).getAll(); Thread.sleep(3000); - assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll()); + assertEquals(goldenRes, grid(0).cache("pu").query(qry0).getAll()); final SqlFieldsQuery qry1; @@ -122,7 +123,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll(); - assertFalse(pRes.isEmpty()); + assertFalse(goldenRes.isEmpty()); assertFalse(rRes.isEmpty()); final AtomicInteger qryCnt = new AtomicInteger(); @@ -161,9 +162,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa qry.setPageSize(smallPageSize ? 30 : 1000); try { - assertEquals(pRes, cache.query(qry).getAll()); + assertEquals(goldenRes, cache.query(qry).getAll()); } catch (CacheException e) { + if (!smallPageSize) + log.error("Unexpected exception at the test", e); + assertTrue("On large page size must retry.", smallPageSize); boolean failedOnRemoteFetch = false; @@ -263,7 +267,7 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCa } }, restartThreadsNum, "restart-thread"); - Thread.sleep(duration); + GridTestUtils.waitForCondition(() -> fail.get(), duration); info("Stopping..."); http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java index 072f1ab..4d02b2e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java @@ -627,6 +627,8 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo * @throws Exception If failed. */ public void testQueryConsistencyMultithreaded() throws Exception { + final int KEY_COUNT = 5000; + // Start complex topology. ignitionStart(serverConfiguration(1)); ignitionStart(serverConfiguration(2)); @@ -638,7 +640,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo run(cli, createSql); - put(cli, 0, 5000); + put(cli, 0, KEY_COUNT); final AtomicBoolean stopped = new AtomicBoolean(); @@ -696,7 +698,7 @@ public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicCo List<Cache.Entry<BinaryObject, BinaryObject>> res = cache.query( new SqlQuery<BinaryObject, BinaryObject>(valTypeName, "from " + TBL_NAME)).getAll(); - assertEquals(5000, res.size()); + assertEquals(KEY_COUNT, res.size()); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java index 7713004..fe45ed6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java @@ -160,7 +160,7 @@ public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest { Map<Thread, ?> conns = perThreadConnections(i); for(Thread t : conns.keySet()) - log.error("+++ Connection is not closed for thread: " + t.getName()); + log.error("Connection is not closed for thread: " + t.getName()); } fail("H2 JDBC connections leak detected. See the log above."); http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java index d5cc0eb..140eb6e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java @@ -17,13 +17,21 @@ package org.apache.ignite.internal.processors.query; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; @@ -31,16 +39,11 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - /** * Tests for lazy query execution. */ public class LazyQuerySelfTest extends GridCommonAbstractTest { - /** Keys ocunt. */ + /** Keys count. */ private static final int KEY_CNT = 200; /** Base query argument. */ @@ -94,6 +97,91 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { } /** + * Test DDL operation on table with high load queries. + * + * @throws Exception If failed. + */ + public void testTableWriteLockStarvation() throws Exception { + final Ignite srv = startGrid(1); + + srv.createCache(cacheConfiguration(4)); + + populateBaseQueryData(srv); + + final AtomicBoolean end = new AtomicBoolean(false); + + final int qryThreads = 10; + + final CountDownLatch latch = new CountDownLatch(qryThreads); + + // Do many concurrent queries. + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + latch.countDown(); + + while(!end.get()) { + FieldsQueryCursor<List<?>> cursor = execute(srv, query(0) + .setPageSize(PAGE_SIZE_SMALL)); + + cursor.getAll(); + } + } + }, qryThreads, "usr-qry"); + + latch.await(); + + Thread.sleep(500); + + execute(srv, new SqlFieldsQuery("CREATE INDEX PERSON_NAME ON Person (name asc)")).getAll(); + execute(srv, new SqlFieldsQuery("DROP INDEX PERSON_NAME")).getAll(); + + // Test is OK in case DDL operations is passed on hi load queries pressure. + end.set(true); + fut.get(); + } + + /** + * Test release reserved partition after query complete (results is bigger than one page). + * + * @throws Exception If failed. + */ + public void testReleasePartitionReservationSeveralPagesResults() throws Exception { + checkReleasePartitionReservation(PAGE_SIZE_SMALL); + } + + /** + * Test release reserved partition after query complete (results is placed on one page). + * + * @throws Exception If failed. + */ + public void testReleasePartitionReservationOnePageResults() throws Exception { + checkReleasePartitionReservation(KEY_CNT); + } + + /** + * Test release reserved partition after query complete. + * + * @param pageSize Results page size. + * @throws Exception If failed. + */ + public void checkReleasePartitionReservation(int pageSize) throws Exception { + Ignite srv1 = startGrid(1); + startGrid(2); + + srv1.createCache(cacheConfiguration(1)); + + populateBaseQueryData(srv1); + + FieldsQueryCursor<List<?>> cursor = execute(srv1, query(0).setPageSize(pageSize)); + + cursor.getAll(); + + startGrid(3); + + awaitPartitionMapExchange(); + } + + /** * Check local query execution. * * @param parallelism Query parallelism. @@ -151,18 +239,18 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { assertNoWorkers(); // Test server node leave with active worker. - cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL)); + FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL)); try { - iter = cursor.iterator(); + Iterator<List<?>> iter2 = cursor2.iterator(); for (int i = 0; i < 30; i++) - iter.next(); + iter2.next(); stopGrid(2); } finally { - cursor.close(); + cursor2.close(); } assertNoWorkers(); @@ -233,7 +321,55 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { } } + checkHoldLazyQuery(node); + + checkShortLazyQuery(node); + } + + /** + * @param node Ignite node. + * @throws Exception If failed. + */ + public void checkHoldLazyQuery(Ignite node) throws Exception { + ArrayList rows = new ArrayList<>(); + + FieldsQueryCursor<List<?>> cursor0 = execute(node, query(BASE_QRY_ARG).setPageSize(PAGE_SIZE_SMALL)); + + // Do many concurrent queries to Test full iteration. + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + for (int i = 0; i < 5; ++i) { + FieldsQueryCursor<List<?>> cursor = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1) + .setPageSize(PAGE_SIZE_SMALL)); + + cursor.getAll(); + } + } + }, 5, "usr-qry"); + + for (List<?> row : cursor0) + rows.add(row); + + assertBaseQueryResults(rows); + } + + /** + * @param node Ignite node. + * @throws Exception If failed. + */ + public void checkShortLazyQuery(Ignite node) throws Exception { + ArrayList rows = new ArrayList<>(); + + FieldsQueryCursor<List<?>> cursor0 = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1).setPageSize(PAGE_SIZE_SMALL)); + + Iterator<List<?>> it = cursor0.iterator(); + assertNoWorkers(); + + while (it.hasNext()) + rows.add(it.next()); + + assertQueryResults(rows, KEY_CNT - PAGE_SIZE_SMALL + 1); } /** @@ -267,8 +403,11 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { * @return Default cache configuration. */ private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) { - return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class) - .setQueryParallelism(parallelism); + return new CacheConfiguration<Long, Person>() + .setName(CACHE_NAME) + .setIndexedTypes(Long.class, Person.class) + .setQueryParallelism(parallelism) + .setAffinity(new RendezvousAffinityFunction(false, 10)); } /** @@ -278,7 +417,7 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { * @return Query. */ private static SqlFieldsQuery query(long arg) { - return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg); + return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= " + arg); } /** @@ -287,13 +426,23 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { * @param rows Result rows. */ private static void assertBaseQueryResults(List<List<?>> rows) { - assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size()); + assertQueryResults(rows, BASE_QRY_ARG); + } + + /** + * Assert base query results. + * + * @param rows Result rows. + * @param resSize Result size. + */ + private static void assertQueryResults(List<List<?>> rows, int resSize) { + assertEquals(KEY_CNT - resSize, rows.size()); for (List<?> row : rows) { Long id = (Long)row.get(0); String name = (String)row.get(1); - assertTrue(id >= BASE_QRY_ARG); + assertTrue(id >= resSize); assertEquals(nameForId(id), name); } } @@ -317,7 +466,7 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { */ @SuppressWarnings("unchecked") private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) { - return cache(node).query(qry.setLazy(true)); + return cache(node).query(qry); } /** @@ -325,8 +474,8 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { * * @throws Exception If failed. */ - private static void assertNoWorkers() throws Exception { - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + private void assertNoWorkers() throws Exception { + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { for (Ignite node : Ignition.allGrids()) { IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing(); @@ -337,7 +486,22 @@ public class LazyQuerySelfTest extends GridCommonAbstractTest { return MapQueryLazyWorker.activeCount() == 0; } - }, 1000L); + }, 1000L)) { + log.error("Lazy workers on nodes:"); + + for (Ignite node : Ignition.allGrids()) { + IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing(); + + if (idx.mapQueryExecutor().registeredLazyWorkers() != 0) { + log.error("[node=" + node + ", " + "registeredLazyWorkers=" + + idx.mapQueryExecutor().registeredLazyWorkers() + ']'); + } + + log.error("Active lazy workers: " + MapQueryLazyWorker.activeCount()); + + fail("There are not stopped lazy workers. See error message above."); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java new file mode 100644 index 0000000..9be0870 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ObjectPoolSelfTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class ObjectPoolSelfTest extends GridCommonAbstractTest { + /** */ + private ObjectPool<Obj> pool = new ObjectPool<>(Obj::new, 1, null, null); + + /** + * @throws Exception If failed. + */ + public void testObjectIsReusedAfterRecycling() throws Exception { + ObjectPoolReusable<Obj> r1 = pool.borrow(); + + Obj o1 = r1.object(); + + r1.recycle(); + + ObjectPoolReusable<Obj> r2 = pool.borrow(); + + Obj o2 = r2.object(); + + assertSame(o1, o2); + + assertFalse(o1.isClosed()); + } + + /** + * @throws Exception If failed. + */ + public void testBorrowedObjectIsNotReturnedTwice() throws Exception { + ObjectPoolReusable<Obj> r1 = pool.borrow(); + ObjectPoolReusable<Obj> r2 = pool.borrow(); + + assertNotSame(r1.object(), r2.object()); + } + + /** + * @throws Exception If failed. + */ + public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception { + ObjectPoolReusable<Obj> r1 = pool.borrow(); + ObjectPoolReusable<Obj> r2 = pool.borrow(); + + Obj o2 = r2.object(); + + r1.recycle(); + r2.recycle(); + + assertNull(r1.object()); + assertNull(r2.object()); + + assertTrue(o2.isClosed()); + } + + /** + * @throws Exception If failed. + */ + public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception { + ObjectPoolReusable<Obj> r1 = pool.borrow(); + ObjectPoolReusable<Obj> r2 = pool.borrow(); + + r1.recycle(); + + assertEquals(1, pool.bagSize()); + + r2.recycle(); + + assertEquals(1, pool.bagSize()); + } + + /** + * @throws Exception If failed. + */ + public void testObjectShouldReturnedToBag() throws Exception { + ObjectPoolReusable<Obj> r1 = pool.borrow(); + + CompletableFuture.runAsync(() -> { + r1.recycle(); + + assertEquals(1, pool.bagSize()); + }).join(); + + assertEquals(1, pool.bagSize()); + } + + /** */ + private static class Obj implements AutoCloseable { + /** */ + private boolean closed = false; + + /** {@inheritDoc} */ + @Override public void close() { + closed = true; + } + + /** + * @return {@code True} if closed. + */ + public boolean isClosed() { + return closed; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java deleted file mode 100644 index b7b7a37..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/ThreadLocalObjectPoolSelfTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2; - -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool.Reusable; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class ThreadLocalObjectPoolSelfTest extends GridCommonAbstractTest { - /** */ - private ThreadLocalObjectPool<Obj> pool = new ThreadLocalObjectPool<>(Obj::new, 1); - - /** - * @throws Exception If failed. - */ - public void testObjectIsReusedAfterRecycling() throws Exception { - Reusable<Obj> o1 = pool.borrow(); - o1.recycle(); - Reusable<Obj> o2 = pool.borrow(); - - assertSame(o1.object(), o2.object()); - assertFalse(o1.object().isClosed()); - } - - /** - * @throws Exception If failed. - */ - public void testBorrowedObjectIsNotReturnedTwice() throws Exception { - Reusable<Obj> o1 = pool.borrow(); - Reusable<Obj> o2 = pool.borrow(); - - assertNotSame(o1.object(), o2.object()); - } - - /** - * @throws Exception If failed. - */ - public void testObjectShouldBeClosedOnRecycleIfPoolIsFull() throws Exception { - Reusable<Obj> o1 = pool.borrow(); - Reusable<Obj> o2 = pool.borrow(); - o1.recycle(); - o2.recycle(); - - assertTrue(o2.object().isClosed()); - } - - /** - * @throws Exception If failed. - */ - public void testObjectShouldNotBeReturnedIfPoolIsFull() throws Exception { - Reusable<Obj> o1 = pool.borrow(); - Reusable<Obj> o2 = pool.borrow(); - - o1.recycle(); - - assertEquals(1, pool.bagSize()); - - o2.recycle(); - - assertEquals(1, pool.bagSize()); - } - - /** - * @throws Exception If failed. - */ - public void testObjectShouldReturnedToRecyclingThreadBag() throws Exception { - Reusable<Obj> o1 = pool.borrow(); - - CompletableFuture.runAsync(() -> { - o1.recycle(); - - assertEquals(1, pool.bagSize()); - }).join(); - - assertEquals(0, pool.bagSize()); - } - - /** */ - private static class Obj implements AutoCloseable { - /** */ - private boolean closed = false; - - /** {@inheritDoc} */ - @Override public void close() { - closed = true; - } - - /** - * @return {@code True} if closed. - */ - public boolean isClosed() { - return closed; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/de21856b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java index dbb2c59..ac467d5 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java @@ -384,11 +384,6 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void cancelLazyWorkers() { - startedExecutor.cancelLazyWorkers(); - } - - /** {@inheritDoc} */ @Override GridSpinBusyLock busyLock() { return startedExecutor.busyLock(); } @@ -399,19 +394,8 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void stopAndUnregisterCurrentLazyWorker() { - startedExecutor.stopAndUnregisterCurrentLazyWorker(); - } - - /** {@inheritDoc} */ - @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) { - startedExecutor.unregisterLazyWorker(worker); - } - - /** {@inheritDoc} */ @Override public int registeredLazyWorkers() { return startedExecutor.registeredLazyWorkers(); } } - }
