This is an automated email from the ASF dual-hosted git repository. anovikov pushed a commit to branch ignite-17594 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 81402f8d953ef5f791500f7997b97dc3c8ff9378 Author: Andrey Novikov <anovi...@gridgain.com> AuthorDate: Wed Aug 31 14:34:28 2022 +0700 IGNITE-17594 Provide ability to register listeners for query start/finish events. --- .../query/calcite/QueryRegistryImpl.java | 2 +- ...ngQueryInfo.java => GridQueryFinishedInfo.java} | 151 +++---- ...ingQueryInfo.java => GridQueryStartedInfo.java} | 134 ++---- .../processors/query/GridRunningQueryInfo.java | 48 ++ .../processors/query/RunningQueryManager.java | 134 +++++- .../processors/query/h2/IgniteH2Indexing.java | 62 ++- .../h2/IgniteSqlQueryStartFinishListenerTest.java | 496 +++++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite.java | 2 + 8 files changed, 857 insertions(+), 172 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java index 58c5df1bf25..d023a258e1a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java @@ -63,7 +63,7 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null; long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(), - false, createCancelToken(qry), initiatorId); + false, createCancelToken(qry), initiatorId, false, false, false); rootQry.localQueryId(locId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java similarity index 58% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java index bdc29e9f6ae..8983f8230a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFinishedInfo.java @@ -19,13 +19,13 @@ package org.apache.ignite.internal.processors.query; import java.util.UUID; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; -import org.apache.ignite.internal.processors.tracing.MTC; -import org.apache.ignite.internal.processors.tracing.Span; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** - * Query descriptor. + * Info about finished query. */ -public class GridRunningQueryInfo { +public class GridQueryFinishedInfo { /** */ private final long id; @@ -44,29 +44,30 @@ public class GridRunningQueryInfo { /** */ private final long startTime; - /** Query start time in nanoseconds to measure duration. */ - private final long startTimeNanos; - /** */ - private final GridQueryCancel cancel; + private final long finishTime; /** */ private final boolean loc; - /** */ - private final QueryRunningFuture fut = new QueryRunningFuture(); + /** Enforce join order query flag. */ + private boolean enforceJoinOrder; - /** Span of the running query. */ - private final Span span; + /** Lazy query flag. */ + private boolean lazy; - /** Originator. */ - private final String qryInitiatorId; + /** Distributed joins query flag. */ + private boolean distributedJoins; - /** Request ID. */ - private long reqId; + /** Whether query is failed or not. */ + private final boolean failed; - /** Subject ID. */ - private final UUID subjId; + /** Exception that caused query execution fail. */ + @Nullable + private final Throwable failReason; + + /** Originator. */ + private final String qryInitiatorId; /** * Constructor. @@ -77,23 +78,30 @@ public class GridRunningQueryInfo { * @param qryType Query type. * @param schemaName Schema name. * @param startTime Query start time. - * @param startTimeNanos Query start time in nanoseconds. - * @param cancel Query cancel. + * @param finishTime Query finish time. * @param loc Local query flag. - * @param subjId Subject ID. - */ - public GridRunningQueryInfo( - long id, + * @param enforceJoinOrder Local query flag. + * @param lazy Local query flag. + * @param distributedJoins Local query flag. + * @param failed Whether query is failed or not. + * @param failReason Exception that caused query execution fail. + * @param qryInitiatorId Query's initiator identifier. + */ + public GridQueryFinishedInfo( + Long id, UUID nodeId, String qry, GridCacheQueryType qryType, String schemaName, long startTime, - long startTimeNanos, - GridQueryCancel cancel, + long finishTime, boolean loc, - String qryInitiatorId, - UUID subjId + boolean enforceJoinOrder, + boolean lazy, + boolean distributedJoins, + boolean failed, + @Nullable Throwable failReason, + String qryInitiatorId ) { this.id = id; this.nodeId = nodeId; @@ -101,26 +109,28 @@ public class GridRunningQueryInfo { this.qryType = qryType; this.schemaName = schemaName; this.startTime = startTime; - this.startTimeNanos = startTimeNanos; - this.cancel = cancel; + this.finishTime = finishTime; this.loc = loc; - this.span = MTC.span(); + this.enforceJoinOrder = enforceJoinOrder; + this.lazy = lazy; + this.distributedJoins = distributedJoins; + this.failed = failed; + this.failReason = failReason; this.qryInitiatorId = qryInitiatorId; - this.subjId = subjId; } /** * @return Query ID. */ - public long id() { + public Long id() { return id; } /** - * @return Global query ID. + * @return Node ID. */ - public String globalQueryId() { - return QueryUtils.globalQueryId(nodeId, id); + public UUID nodeId() { + return nodeId; } /** @@ -152,67 +162,53 @@ public class GridRunningQueryInfo { } /** - * @return Query start time in nanoseconds. - */ - public long startTimeNanos() { - return startTimeNanos; - } - - /** - * @param curTime Current time. - * @param duration Duration of long query. - * @return {@code true} if this query should be considered as long running query. + * @return Query finish time. */ - public boolean longQuery(long curTime, long duration) { - return curTime - startTime > duration; + public long finishTime() { + return finishTime; } /** - * Cancel query. + * @return {@code true} if query is local. */ - public void cancel() { - if (cancel != null) - cancel.cancel(); + public boolean local() { + return loc; } /** - * @return Query running future. + * @return Enforce join order flag. */ - public QueryRunningFuture runningFuture() { - return fut; + public boolean enforceJoinOrder() { + return enforceJoinOrder; } /** - * @return {@code true} if query can be cancelled. + * @return Lazy flag. */ - public boolean cancelable() { - return cancel != null; + public boolean lazy() { + return lazy; } /** - * @return {@code true} if query is local. + * @return Distributed joins. */ - public boolean local() { - return loc; + public boolean distributedJoins() { + return distributedJoins; } /** - * @return Originating node ID. + * @return {@code true} if query is failed. */ - public UUID nodeId() { - return nodeId; + public boolean failed() { + return failed; } /** - * @return Span of the running query. + * @return Exception that caused query execution fail, or {@code null} if query succeded. */ - public Span span() { - return span; - } - - /** @return Request ID. */ - public long requestId() { - return reqId; + @Nullable + public Throwable failReason() { + return failReason; } /** @@ -223,13 +219,8 @@ public class GridRunningQueryInfo { return qryInitiatorId; } - /** @param reqId Request ID. */ - public void requestId(long reqId) { - this.reqId = reqId; - } - - /** @return Subject ID. */ - public UUID subjectId() { - return subjId; + /**{@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryFinishedInfo.class, this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java similarity index 58% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java copy to modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java index bdc29e9f6ae..f0e83794dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryStartedInfo.java @@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.query; import java.util.UUID; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; -import org.apache.ignite.internal.processors.tracing.MTC; -import org.apache.ignite.internal.processors.tracing.Span; +import org.apache.ignite.internal.util.typedef.internal.S; /** - * Query descriptor. + * Info about new started query. */ -public class GridRunningQueryInfo { +public class GridQueryStartedInfo { /** */ private final long id; @@ -44,30 +43,24 @@ public class GridRunningQueryInfo { /** */ private final long startTime; - /** Query start time in nanoseconds to measure duration. */ - private final long startTimeNanos; - - /** */ - private final GridQueryCancel cancel; + /** Query cancellable flag. */ + private boolean cancellable; /** */ private final boolean loc; - /** */ - private final QueryRunningFuture fut = new QueryRunningFuture(); + /** Enforce join order query flag. */ + private boolean enforceJoinOrder; + + /** Lazy query flag. */ + private boolean lazy; - /** Span of the running query. */ - private final Span span; + /** Distributed joins query flag. */ + private boolean distributedJoins; /** Originator. */ private final String qryInitiatorId; - /** Request ID. */ - private long reqId; - - /** Subject ID. */ - private final UUID subjId; - /** * Constructor. * @@ -77,23 +70,26 @@ public class GridRunningQueryInfo { * @param qryType Query type. * @param schemaName Schema name. * @param startTime Query start time. - * @param startTimeNanos Query start time in nanoseconds. - * @param cancel Query cancel. + * @param cancellable Query cancellable flag. * @param loc Local query flag. - * @param subjId Subject ID. + * @param enforceJoinOrder Local query flag. + * @param lazy Local query flag. + * @param distributedJoins Local query flag. + * @param qryInitiatorId Query's initiator identifier. */ - public GridRunningQueryInfo( - long id, + public GridQueryStartedInfo( + Long id, UUID nodeId, String qry, GridCacheQueryType qryType, String schemaName, long startTime, - long startTimeNanos, - GridQueryCancel cancel, + boolean cancellable, boolean loc, - String qryInitiatorId, - UUID subjId + boolean enforceJoinOrder, + boolean lazy, + boolean distributedJoins, + String qryInitiatorId ) { this.id = id; this.nodeId = nodeId; @@ -101,26 +97,26 @@ public class GridRunningQueryInfo { this.qryType = qryType; this.schemaName = schemaName; this.startTime = startTime; - this.startTimeNanos = startTimeNanos; - this.cancel = cancel; + this.cancellable = cancellable; this.loc = loc; - this.span = MTC.span(); + this.enforceJoinOrder = enforceJoinOrder; + this.lazy = lazy; + this.distributedJoins = distributedJoins; this.qryInitiatorId = qryInitiatorId; - this.subjId = subjId; } /** * @return Query ID. */ - public long id() { + public Long id() { return id; } /** - * @return Global query ID. + * @return Node ID. */ - public String globalQueryId() { - return QueryUtils.globalQueryId(nodeId, id); + public UUID nodeId() { + return nodeId; } /** @@ -151,42 +147,11 @@ public class GridRunningQueryInfo { return startTime; } - /** - * @return Query start time in nanoseconds. - */ - public long startTimeNanos() { - return startTimeNanos; - } - - /** - * @param curTime Current time. - * @param duration Duration of long query. - * @return {@code true} if this query should be considered as long running query. - */ - public boolean longQuery(long curTime, long duration) { - return curTime - startTime > duration; - } - - /** - * Cancel query. - */ - public void cancel() { - if (cancel != null) - cancel.cancel(); - } - - /** - * @return Query running future. - */ - public QueryRunningFuture runningFuture() { - return fut; - } - /** * @return {@code true} if query can be cancelled. */ - public boolean cancelable() { - return cancel != null; + public boolean cancellable() { + return cancellable; } /** @@ -197,22 +162,24 @@ public class GridRunningQueryInfo { } /** - * @return Originating node ID. + * @return Enforce join order flag. */ - public UUID nodeId() { - return nodeId; + public boolean enforceJoinOrder() { + return enforceJoinOrder; } /** - * @return Span of the running query. + * @return Lazy flag. */ - public Span span() { - return span; + public boolean lazy() { + return lazy; } - /** @return Request ID. */ - public long requestId() { - return reqId; + /** + * @return Distributed joins. + */ + public boolean distributedJoins() { + return distributedJoins; } /** @@ -223,13 +190,8 @@ public class GridRunningQueryInfo { return qryInitiatorId; } - /** @param reqId Request ID. */ - public void requestId(long reqId) { - this.reqId = reqId; - } - - /** @return Subject ID. */ - public UUID subjectId() { - return subjId; + /**{@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryStartedInfo.class, this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java index bdc29e9f6ae..927239533cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java @@ -21,6 +21,8 @@ import java.util.UUID; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.Span; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; /** * Query descriptor. @@ -54,6 +56,7 @@ public class GridRunningQueryInfo { private final boolean loc; /** */ + @GridToStringExclude private final QueryRunningFuture fut = new QueryRunningFuture(); /** Span of the running query. */ @@ -62,6 +65,15 @@ public class GridRunningQueryInfo { /** Originator. */ private final String qryInitiatorId; + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** Lazy flag. */ + private final boolean lazy; + + /** Distributed joins flag. */ + private final boolean distributedJoins; + /** Request ID. */ private long reqId; @@ -80,6 +92,10 @@ public class GridRunningQueryInfo { * @param startTimeNanos Query start time in nanoseconds. * @param cancel Query cancel. * @param loc Local query flag. + * @param qryInitiatorId Query's initiator identifier. + * @param enforceJoinOrder Enforce join order flag. + * @param lazy Lazy flag. + * @param distributedJoins Distributed joins flag. * @param subjId Subject ID. */ public GridRunningQueryInfo( @@ -93,6 +109,9 @@ public class GridRunningQueryInfo { GridQueryCancel cancel, boolean loc, String qryInitiatorId, + boolean enforceJoinOrder, + boolean lazy, + boolean distributedJoins, UUID subjId ) { this.id = id; @@ -106,6 +125,9 @@ public class GridRunningQueryInfo { this.loc = loc; this.span = MTC.span(); this.qryInitiatorId = qryInitiatorId; + this.enforceJoinOrder = enforceJoinOrder; + this.lazy = lazy; + this.distributedJoins = distributedJoins; this.subjId = subjId; } @@ -223,6 +245,27 @@ public class GridRunningQueryInfo { return qryInitiatorId; } + /** + * @return Distributed joins. + */ + public boolean distributedJoins() { + return distributedJoins; + } + + /** + * @return Enforce join order flag. + */ + public boolean enforceJoinOrder() { + return enforceJoinOrder; + } + + /** + * @return Lazy flag. + */ + public boolean lazy() { + return lazy; + } + /** @param reqId Request ID. */ public void requestId(long reqId) { this.reqId = reqId; @@ -232,4 +275,9 @@ public class GridRunningQueryInfo { public UUID subjectId() { return subjId; } + + /**{@inheritDoc} */ + @Override public String toString() { + return S.toString(GridRunningQueryInfo.class, this); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java index 11f1c8709df..5dcf7b66a47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java @@ -25,10 +25,13 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cluster.ClusterNode; @@ -37,15 +40,18 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.systemview.walker.SqlQueryHistoryViewWalker; import org.apache.ignite.internal.managers.systemview.walker.SqlQueryViewWalker; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; +import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest; import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse; import org.apache.ignite.internal.processors.tracing.Span; @@ -89,6 +95,9 @@ public class RunningQueryManager { /** Undefined query ID value. */ public static final long UNDEFINED_QUERY_ID = 0L; + /** */ + private final GridClosureProcessor closure; + /** Keep registered user queries. */ private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap<>(); @@ -147,6 +156,12 @@ public class RunningQueryManager { } }; + /** */ + private final List<Consumer<GridQueryStartedInfo>> qryStartedListeners = new CopyOnWriteArrayList<>(); + + /** */ + private final List<Consumer<GridQueryFinishedInfo>> qryFinishedListeners = new CopyOnWriteArrayList<>(); + /** * Constructor. * @@ -160,6 +175,7 @@ public class RunningQueryManager { localNodeId = ctx.localNodeId(); histSz = ctx.config().getSqlConfiguration().getSqlQueryHistorySize(); + closure = ctx.closure(); qryHistTracker = new QueryHistoryTracker(histSz); @@ -229,11 +245,14 @@ public class RunningQueryManager { * @param schemaName Schema name. * @param loc Local query flag. * @param cancel Query cancel. Should be passed in case query is cancelable, or {@code null} otherwise. + * @param enforceJoinOrder Enforce join order flag. + * @param lazy Lazy flag. + * @param distributedJoins Distributed joins flag. * @return Id of registered query. Id is a positive number. */ public long register(String qry, GridCacheQueryType qryType, String schemaName, boolean loc, @Nullable GridQueryCancel cancel, - String qryInitiatorId) { + String qryInitiatorId, boolean enforceJoinOrder, boolean lazy, boolean distributedJoins) { long qryId = qryIdGen.incrementAndGet(); if (qryInitiatorId == null) @@ -250,6 +269,9 @@ public class RunningQueryManager { cancel, loc, qryInitiatorId, + enforceJoinOrder, + lazy, + distributedJoins, securitySubjectId(ctx) ); @@ -262,6 +284,41 @@ public class RunningQueryManager { run.span().addTag(SQL_QRY_ID, run::globalQueryId); + if (!qryStartedListeners.isEmpty()) { + GridQueryStartedInfo info = new GridQueryStartedInfo( + run.id(), + localNodeId, + run.query(), + run.queryType(), + run.schemaName(), + run.startTime(), + run.cancelable(), + run.local(), + run.enforceJoinOrder(), + run.lazy(), + run.distributedJoins(), + run.queryInitiatorId() + ); + + try { + closure.runLocal( + () -> qryStartedListeners.forEach(lsnr -> { + try { + lsnr.accept(info); + } + catch (Exception ex) { + log.error("Listener fails during handling query started" + + " event [qryId=" + qryId + "]", ex); + } + }), + GridIoPolicy.PUBLIC_POOL + ); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex.getMessage(), ex); + } + } + return qryId; } @@ -289,6 +346,43 @@ public class RunningQueryManager { if (failed) qrySpan.addTag(ERROR, failReason::getMessage); + if (!qryFinishedListeners.isEmpty()) { + GridQueryFinishedInfo info = new GridQueryFinishedInfo( + qry.id(), + localNodeId, + qry.query(), + qry.queryType(), + qry.schemaName(), + qry.startTime(), + System.currentTimeMillis(), + qry.local(), + qry.enforceJoinOrder(), + qry.lazy(), + qry.distributedJoins(), + failed, + failReason, + qry.queryInitiatorId() + ); + + try { + closure.runLocal( + () -> qryFinishedListeners.forEach(lsnr -> { + try { + lsnr.accept(info); + } + catch (Exception ex) { + log.error("Listener fails during handling query finished" + + " event [qryId=" + qryId + "]", ex); + } + }), + GridIoPolicy.PUBLIC_POOL + ); + } + catch (IgniteCheckedException ex) { + throw new IgniteException(ex.getMessage(), ex); + } + } + //We need to collect query history and metrics only for SQL queries. if (isSqlQuery(qry)) { qry.runningFuture().onDone(); @@ -349,6 +443,44 @@ public class RunningQueryManager { return res; } + /** + * @param lsnr Listener. + */ + public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) { + A.notNull(lsnr, "lsnr"); + + qryStartedListeners.add(lsnr); + } + + /** + * @param lsnr Listener. + */ + @SuppressWarnings("SuspiciousMethodCalls") + public boolean unregisterQueryStartedListener(Object lsnr) { + A.notNull(lsnr, "lsnr"); + + return qryStartedListeners.remove(lsnr); + } + + /** + * @param lsnr Listener. + */ + public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) { + A.notNull(lsnr, "lsnr"); + + qryFinishedListeners.add(lsnr); + } + + /** + * @param lsnr Listener. + */ + @SuppressWarnings("SuspiciousMethodCalls") + public boolean unregisterQueryFinishedListener(Object lsnr) { + A.notNull(lsnr, "lsnr"); + + return qryFinishedListeners.remove(lsnr); + } + /** * Check belongs running query to an SQL type. * diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 7a5c824e60e..9788f2987bc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -36,6 +36,7 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Stream; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -101,8 +102,11 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; +import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo; import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProperty; +import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; +import org.apache.ignite.internal.processors.query.GridQueryStartedInfo; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryField; @@ -512,13 +516,29 @@ public class IgniteH2Indexing implements GridQueryIndexing { H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, typeName); if (tbl != null && tbl.luceneIndex() != null) { - long qryId = runningQueryManager().register(qry, TEXT, schemaName, true, null, null); + long qryId = runningQueryManager().register( + qry, + TEXT, + schemaName, + true, + null, + null, + false, + false, + false + ); + Throwable failReason = null; try { return tbl.luceneIndex().query(qry.toUpperCase(), filters, limit); } + catch (Throwable t) { + failReason = t; + + throw t; + } finally { - runningQueryManager().unregister(qryId, null); + runningQueryManager().unregister(qryId, failReason); } } @@ -737,7 +757,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { schemaName, true, null, - qryInitiatorId + qryInitiatorId, + false, + false, + false ); Exception failReason = null; @@ -1612,7 +1635,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { qryDesc.schemaName(), qryDesc.local(), cancel, - qryDesc.queryInitiatorId() + qryDesc.queryInitiatorId(), + qryDesc.enforceJoinOrder(), + qryParams.lazy(), + qryDesc.distributedJoins() ); if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) { @@ -1661,6 +1687,34 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } + /** + * @param lsnr Listener. + */ + public void registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) { + runningQueryManager().registerQueryStartedListener(lsnr); + } + + /** + * @param lsnr Listener. + */ + public boolean unregisterQueryStartedListener(Object lsnr) { + return runningQueryManager().unregisterQueryStartedListener(lsnr); + } + + /** + * @param lsnr Listener. + */ + public void registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) { + runningQueryManager().registerQueryFinishedListener(lsnr); + } + + /** + * @param lsnr Listener. + */ + public boolean unregisterQueryFinishedListener(Object lsnr) { + return runningQueryManager().unregisterQueryFinishedListener(lsnr); + } + /** {@inheritDoc} */ @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional( GridCacheContext<?, ?> cctx, diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java new file mode 100644 index 00000000000..5e9e3dfa27e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/IgniteSqlQueryStartFinishListenerTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo; +import org.apache.ignite.internal.processors.query.GridQueryStartedInfo; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.testframework.GridTestUtils; +import org.hamcrest.CustomMatcher; +import org.hamcrest.Matcher; +import org.junit.After; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.query.QueryUtils.SCHEMA_SYS; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.junit.Assert.assertThat; + +/** Test for SQL query listeners. */ +public class IgniteSqlQueryStartFinishListenerTest extends AbstractIndexingCommonTest { + /** Client node name. */ + private static final String CLIENT_NODE_NAME = "CLIENT_NODE"; + + /** Client node name. */ + private static final String SERVER_NODE_NAME = "SERVER_NODE"; + + /** Listeners. */ + private final List<Object> lsnrs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(SERVER_NODE_NAME); + startClientGrid(CLIENT_NODE_NAME); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** */ + @After + public void unregisterListeners() { + lsnrs.forEach(indexing()::unregisterQueryFinishedListener); + lsnrs.forEach(indexing()::unregisterQueryStartedListener); + + lsnrs.clear(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return super.getConfiguration(gridName) + .setSqlSchemas("TEST1") + .setCacheConfiguration( + new CacheConfiguration<String, String>(DEFAULT_CACHE_NAME) + .setQueryEntities(Collections.singleton(new QueryEntity(String.class, String.class))) + .setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class) + ); + } + + /** + * Ensure you could register and unregister a listener for query start/finish events: + * - register listeners + * - execute a query + * - ensure both listeneres were notified + * - unregister the query start listener + * - run a query one more time + * - ensure only one listener was notified + * - unregister the query finish listener and register new one + * - run a query one more time + * - ensure only new listener was notified + * + * @throws Exception In case of error. + */ + @Test + public void testRegisterUnregisterQueryListeners() throws Exception { + final AtomicInteger qryStarted = new AtomicInteger(); + final AtomicInteger qryFinished = new AtomicInteger(); + + final Consumer<GridQueryStartedInfo> qryStartedLsnr = registerQueryStartedListener(info -> qryStarted.incrementAndGet()); + final Consumer<GridQueryFinishedInfo> qryFinishedLsnr = registerQueryFinishedListener(info -> qryFinished.incrementAndGet()); + + { + execSql(SCHEMA_SYS, "select * from caches"); + + assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000); + assertWithTimeout(qryFinished::get, is(equalTo(1)), 1_000); + } + + { + assertTrue(indexing().unregisterQueryStartedListener(qryStartedLsnr)); + + execSql(SCHEMA_SYS, "select * from caches"); + + assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000); + assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000); + } + + { + assertTrue(indexing().unregisterQueryFinishedListener(qryFinishedLsnr)); + + final CountDownLatch latch = new CountDownLatch(1); + + registerQueryFinishedListener(info -> latch.countDown()); + + execSql(SCHEMA_SYS, "select * from caches"); + + latch.await(1, TimeUnit.SECONDS); + + assertWithTimeout(qryFinished::get, is(equalTo(2)), 1_000); + assertWithTimeout(qryStarted::get, is(equalTo(1)), 1_000); + } + } + + /** + * Ensure listeners are notified with an actual query info: + * - register listeners + * - execute different queries + * - verify query info passed to listeners + */ + @Test + @SuppressWarnings("ThrowableNotThrown") + public void testVerifyQueryInfoPassedToListeners() throws Exception { + final AtomicReference<GridQueryStartedInfo> qryStarted = new AtomicReference<>(); + final AtomicReference<GridQueryFinishedInfo> qryFinished = new AtomicReference<>(); + + registerQueryStartedListener(qryStarted::set); + registerQueryFinishedListener(qryFinished::set); + + { + final long delay = 100; + final String qry = "select * from caches where ? = \"default\".delay(?) limit 1"; + + execSql(SCHEMA_SYS, qry, delay, delay); + + assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000); + + GridQueryStartedInfo startedInfo = qryStarted.get(); + assertEquals(SCHEMA_SYS, startedInfo.schemaName()); + assertEquals(qry, startedInfo.query()); + assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId()); + assertEquals(false, startedInfo.local()); + assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType()); + + assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000); + + GridQueryFinishedInfo finishedInfo = qryFinished.get(); + assertEquals(SCHEMA_SYS, finishedInfo.schemaName()); + assertEquals(qry, finishedInfo.query()); + assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId()); + assertEquals(false, finishedInfo.local()); + assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType()); + assertEquals(false, finishedInfo.failed()); + assertThat(finishedInfo.finishTime() - finishedInfo.startTime(), is(greaterOrEqualTo(delay))); + + qryStarted.set(null); + qryFinished.set(null); + } + + { + final String schema = "TEST1"; + final String qry = "select \"default\".can_fail() from " + SCHEMA_SYS + ".caches where ? = ? limit 1"; + + GridTestUtils.SqlTestFunctions.fail = true; + + GridTestUtils.assertThrowsWithCause(() -> execSqlLocal(schema, qry, 1, 1), IgniteSQLException.class); + + assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000); + + GridQueryStartedInfo startedInfo = qryStarted.get(); + assertEquals(schema, startedInfo.schemaName()); + assertEquals(qry, startedInfo.query()); + assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId()); + assertEquals(true, startedInfo.local()); + assertEquals(GridCacheQueryType.SQL_FIELDS, startedInfo.queryType()); + + assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000); + + GridQueryFinishedInfo finishedInfo = qryFinished.get(); + assertEquals(schema, finishedInfo.schemaName()); + assertEquals(qry, finishedInfo.query()); + assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId()); + assertEquals(true, finishedInfo.local()); + assertEquals(GridCacheQueryType.SQL_FIELDS, finishedInfo.queryType()); + assertEquals(true, finishedInfo.failed()); + assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime()))); + + qryStarted.set(null); + qryFinished.set(null); + } + + { + final String qry = "text query"; + + IgniteCache<String, Object> cache = grid(CLIENT_NODE_NAME).cache(DEFAULT_CACHE_NAME); + + cache.query(new TextQuery<String, String>(String.class, "text query")); + + assertWithTimeout(qryStarted::get, is(notNullValue()), 1_000); + + GridQueryStartedInfo startedInfo = qryStarted.get(); + assertEquals(cache.getName(), startedInfo.schemaName()); + assertEquals(qry, startedInfo.query()); + assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId()); + assertEquals(true, startedInfo.local()); + assertEquals(GridCacheQueryType.TEXT, startedInfo.queryType()); + + assertWithTimeout(qryFinished::get, is(notNullValue()), 1_000); + + GridQueryFinishedInfo finishedInfo = qryFinished.get(); + assertEquals(cache.getName(), finishedInfo.schemaName()); + assertEquals(qry, finishedInfo.query()); + assertEquals(grid(SERVER_NODE_NAME).localNode().id(), startedInfo.nodeId()); + assertEquals(true, finishedInfo.local()); + assertEquals(GridCacheQueryType.TEXT, finishedInfo.queryType()); + assertEquals(false, finishedInfo.failed()); + assertThat(finishedInfo.finishTime(), is(greaterOrEqualTo(finishedInfo.startTime()))); + + qryStarted.set(null); + qryFinished.set(null); + } + } + + /** + * Ensure listeners do not block query execution + * - register blocking listeners + * - execute a lot of queries + * - verify all queries finished while listeners is still blocked + */ + @Test + public void testListeneresNotBlocksQueryExecution() throws IgniteCheckedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger lsnrCalls = new AtomicInteger(); + + final int quryRuns = 1_000; + final int threadCnt = 20; + + registerQueryStartedListener(info -> { + try { + latch.await(); + } + catch (InterruptedException ignored) { + } + + lsnrCalls.incrementAndGet(); + }); + registerQueryFinishedListener(info -> { + try { + latch.await(); + } + catch (InterruptedException ignored) { + } + + lsnrCalls.incrementAndGet(); + }); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> { + for (int i = 0; i < quryRuns; i++) + execSql(SCHEMA_SYS, "select * from caches"); + }, threadCnt, "test-async-query-runner"); + + try { + fut.get(15_000); + } + finally { + latch.countDown(); + } + + assertWithTimeout(lsnrCalls::get, is(equalTo(2 * threadCnt * quryRuns)), 15_000); + } + + /** + * Ensure notification chain is not interrupted if exception was thrown in the middle of the chain + * - register several listeners such the listener will throw exception if condition is met + * - execute query several times, one of the listeners from each chain should fail + * - verify all other listeners were notified + */ + @Test + public void testFailedListenereNotAffectOthers() throws IgniteCheckedException { + final int lsnrCnt = 3; + final long waitTimeout = 1_000; + + boolean[] startLsnrsNotified = new boolean[lsnrCnt]; + boolean[] finishLsnrsNotified = new boolean[lsnrCnt]; + boolean[] startLsnrShouldFail = new boolean[lsnrCnt]; + boolean[] finishLsnrShouldFail = new boolean[lsnrCnt]; + + for (int i = 0; i < lsnrCnt; i++) { + final int lsnNo = i; + + registerQueryStartedListener(info -> { + if (startLsnrShouldFail[lsnNo]) { + startLsnrShouldFail[lsnNo] = false; + + throw new RuntimeException("Start listener fails"); + } + + startLsnrsNotified[lsnNo] = true; + }); + + registerQueryFinishedListener(info -> { + if (finishLsnrShouldFail[lsnNo]) { + finishLsnrShouldFail[lsnNo] = false; + + throw new RuntimeException("Finish listener fails"); + } + + finishLsnrsNotified[lsnNo] = true; + }); + } + + { + startLsnrShouldFail[0] = true; + finishLsnrShouldFail[1] = true; + + execSql(SCHEMA_SYS, "select * from caches"); + + assertWithTimeout(() -> startLsnrsNotified[0], isFalse(), waitTimeout); + assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout); + assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout); + + assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout); + assertWithTimeout(() -> finishLsnrsNotified[1], isFalse(), waitTimeout); + assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout); + + resetListeners(startLsnrsNotified, finishLsnrsNotified); + } + + { + startLsnrShouldFail[1] = true; + finishLsnrShouldFail[2] = true; + + execSql(SCHEMA_SYS, "select * from caches"); + + assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout); + assertWithTimeout(() -> startLsnrsNotified[1], isFalse(), waitTimeout); + assertWithTimeout(() -> startLsnrsNotified[2], isTrue(), waitTimeout); + + assertWithTimeout(() -> finishLsnrsNotified[0], isTrue(), waitTimeout); + assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout); + assertWithTimeout(() -> finishLsnrsNotified[2], isFalse(), waitTimeout); + + resetListeners(startLsnrsNotified, finishLsnrsNotified); + } + + { + startLsnrShouldFail[2] = true; + finishLsnrShouldFail[0] = true; + + execSql(SCHEMA_SYS, "select * from caches"); + + assertWithTimeout(() -> startLsnrsNotified[0], isTrue(), waitTimeout); + assertWithTimeout(() -> startLsnrsNotified[1], isTrue(), waitTimeout); + assertWithTimeout(() -> startLsnrsNotified[2], isFalse(), waitTimeout); + + assertWithTimeout(() -> finishLsnrsNotified[0], isFalse(), waitTimeout); + assertWithTimeout(() -> finishLsnrsNotified[1], isTrue(), waitTimeout); + assertWithTimeout(() -> finishLsnrsNotified[2], isTrue(), waitTimeout); + + resetListeners(startLsnrsNotified, finishLsnrsNotified); + } + } + + /** + * Sets all elements from each array to {@code false}. + */ + private void resetListeners(boolean[]... arrays) { + for (boolean[] arr : arrays) + Arrays.fill(arr, false); + } + + /** */ + private IgniteH2Indexing indexing() { + return (IgniteH2Indexing)grid(SERVER_NODE_NAME).context().query().getIndexing(); + } + + /** */ + private List<List<?>> execSql(String schema, String sql, Object... args) { + return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query( + new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(false) + ).getAll(); + } + + /** */ + private List<List<?>> execSqlLocal(String schema, String sql, Object... args) { + return grid(SERVER_NODE_NAME).cache(DEFAULT_CACHE_NAME).query( + new SqlFieldsQuery(sql).setSchema(schema).setArgs(args).setLocal(true) + ).getAll(); + } + + /** + * @param lsnr Listener. + */ + private Consumer<GridQueryStartedInfo> registerQueryStartedListener(Consumer<GridQueryStartedInfo> lsnr) { + lsnrs.add(lsnr); + + indexing().registerQueryStartedListener(lsnr); + + return lsnr; + } + + + /** + * @param lsnr Listener. + */ + private Consumer<GridQueryFinishedInfo> registerQueryFinishedListener(Consumer<GridQueryFinishedInfo> lsnr) { + lsnrs.add(lsnr); + + indexing().registerQueryFinishedListener(lsnr); + + return lsnr; + } + + /** + * @param actualSupplier Supplier for value to check. + * @param matcher Matcher. + * @param timeout Timeout. + */ + private <T> void assertWithTimeout(Supplier<T> actualSupplier, Matcher<? super T> matcher, long timeout) + throws IgniteInterruptedCheckedException { + GridTestUtils.waitForCondition(() -> matcher.matches(actualSupplier.get()), timeout); + + assertThat(actualSupplier.get(), matcher); + } + + /** + * @param wanted Wanted. + */ + private static <T extends Comparable<? super T>> Matcher<T> greaterOrEqualTo(T wanted) { + return new CustomMatcher<T>("should be greater or equal to " + wanted) { + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override public boolean matches(Object item) { + return wanted != null && item instanceof Comparable && ((Comparable)item).compareTo(wanted) >= 0; + } + }; + } + + /** */ + private static Matcher<Boolean> isTrue() { + return new CustomMatcher<Boolean>("should be true ") { + @Override public boolean matches(Object item) { + return item instanceof Boolean && (Boolean)item; + } + }; + } + + /** */ + private static Matcher<Boolean> isFalse() { + return new CustomMatcher<Boolean>("should be true ") { + @Override public boolean matches(Object item) { + return item instanceof Boolean && !(Boolean)item; + } + }; + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index c2722a15e0a..24dc627847a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -137,6 +137,7 @@ import org.apache.ignite.internal.processors.query.h2.GridSubqueryJoinOptimizerS import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest; import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest; import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest; +import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryStartFinishListenerTest; import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest; import org.apache.ignite.internal.processors.query.h2.sql.ExplainSelfTest; import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest; @@ -302,6 +303,7 @@ import org.junit.runners.Suite; IgniteCacheMultipleIndexedTypesTest.class, IgniteSqlQueryMinMaxTest.class, + IgniteSqlQueryStartFinishListenerTest.class, GridCircularQueueTest.class, IndexingSpiQueryWithH2IndexingSelfTest.class,