This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new c85131866d0 IGNITE-19584 SQL Calcite: Fix SQL metrics c85131866d0 is described below commit c85131866d08aa3b4047f1c21fb1262f46003bc6 Author: Aleksey Plekhanov <plehanov.a...@gmail.com> AuthorDate: Mon Jun 19 11:51:59 2023 +0500 IGNITE-19584 SQL Calcite: Fix SQL metrics IGNITE-19586 SQL Calcite: Fix SQL metrics - Fixes #10777. Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com> --- docs/_docs/monitoring-metrics/new-metrics.adoc | 26 ++++ .../query/calcite/CalciteQueryProcessor.java | 39 ++++- .../internal/processors/query/calcite/Query.java | 24 +-- .../processors/query/calcite/RootQuery.java | 30 ++-- .../calcite/exec/ExecutionCancelledException.java | 24 --- .../query/calcite/exec/ExecutionServiceImpl.java | 3 +- .../query/calcite/exec/rel/AbstractNode.java | 6 +- .../query/calcite/exec/rel/RootNode.java | 13 +- .../query/calcite/prepare/QueryPlanCacheImpl.java | 16 +- .../integration/RunningQueriesIntegrationTest.java | 41 ++++-- .../integration/SqlDiagnosticIntegrationTest.java | 163 +++++++++++++++++++++ .../query}/QueryParserMetricsHolder.java | 4 +- .../internal/processors/query/h2/QueryParser.java | 1 + .../query/h2/QueryParserMetricsHolderSelfTest.java | 3 +- 14 files changed, 297 insertions(+), 96 deletions(-) diff --git a/docs/_docs/monitoring-metrics/new-metrics.adoc b/docs/_docs/monitoring-metrics/new-metrics.adoc index 7eae8cd24da..1b260d435e0 100644 --- a/docs/_docs/monitoring-metrics/new-metrics.adoc +++ b/docs/_docs/monitoring-metrics/new-metrics.adoc @@ -471,3 +471,29 @@ Register name: `cache` |LastDataVer| long | The latest data version on the node. |DataVersionClusterId| integer | Data version cluster id. |=== + +== SQL parser metrics + +Register name: `sql.parser.cache` + +[cols="2,1,3",opts="header"] +|=== +|Name| Type| Description +|hits| long | The number of SQL queries that were found in the parsers cache (doesn't require to be parsed and planned before execution). +|misses| long | The number of SQL queries that were parsed and planned. +|=== + +== SQL executor metrics + +Register name: `sql.queries.user` + +[cols="2,1,3",opts="header"] +|=== +|Name| Type| Description +|success| long | The number of succesfully executed SQL queries. +|failed| long | The number of failed SQL queries (including canceled). +|canceled| long | The number of canceled SQL queries. +|=== + + + diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index c52c8f4180e..6bc9cccf5fa 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Function; import org.apache.calcite.DataContexts; @@ -49,6 +50,7 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.ignite.SystemProperty; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; import org.apache.ignite.configuration.QueryEngineConfiguration; import org.apache.ignite.events.SqlQueryExecutionEvent; @@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryContext; import org.apache.ignite.internal.processors.query.QueryEngine; +import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService; @@ -177,6 +180,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query /** */ private final QueryPlanCache qryPlanCache; + /** */ + private final QueryParserMetricsHolder parserMetrics; + /** */ private final QueryTaskExecutor taskExecutor; @@ -225,6 +231,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query failureProcessor = ctx.failure(); schemaHolder = new SchemaHolderImpl(ctx); qryPlanCache = new QueryPlanCacheImpl(ctx); + parserMetrics = new QueryParserMetricsHolder(ctx.metric()); mailboxRegistry = new MailboxRegistryImpl(ctx); taskExecutor = new QueryTaskExecutorImpl(ctx); executionSvc = new ExecutionServiceImpl<>(ctx, ArrayRowHandler.INSTANCE); @@ -422,9 +429,18 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query @Override public QueryPlan apply(RootQuery<Object[]> qry, Object[] params) { if (plan == null) { - plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params), () -> - prepareSvc.prepareSingle(qryNode, qry.planningContext()) - ); + AtomicBoolean miss = new AtomicBoolean(); + + plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params), () -> { + miss.set(true); + + return prepareSvc.prepareSingle(qryNode, qry.planningContext()); + }); + + if (miss.get()) + parserMetrics.countCacheMiss(); + else + parserMetrics.countCacheHit(); } return plan; @@ -457,11 +473,15 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params)); if (plan != null) { + parserMetrics.countCacheHit(); + return Collections.singletonList( processQuery(qryCtx, qry -> action.apply(qry, plan), schema.getName(), plan.query(), null, params) ); } + parserMetrics.countCacheMiss(); + SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig()); List<T> res = new ArrayList<>(qryList.size()); @@ -555,12 +575,17 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query if (qrys != null) qrys.forEach(RootQuery::cancel); - qryReg.unregister(qry.id(), e); + if (isCanceled) { + qryReg.unregister(qry.id(), new QueryCancelledException()); + + throw new IgniteSQLException("The query was cancelled while planning", + IgniteQueryErrorCode.QUERY_CANCELED, e); + } + else { + qryReg.unregister(qry.id(), e); - if (isCanceled) - throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e); - else throw e; + } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java index 9d42e051ac6..38f67f59d1f 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java @@ -28,11 +28,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService; -import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker; @@ -165,9 +165,9 @@ public class Query<RowT> { } for (RunningFragment<RowT> frag : fragments) - frag.context().execute(() -> frag.root().onError(new ExecutionCancelledException()), frag.root()::onError); + frag.context().execute(() -> frag.root().onError(new QueryCancelledException()), frag.root()::onError); - tryClose(new ExecutionCancelledException()); + tryClose(queryCanceledException()); } /** */ @@ -176,13 +176,8 @@ public class Query<RowT> { if (state == QueryState.INITED) state = QueryState.EXECUTING; - if (state == QueryState.CLOSING || state == QueryState.CLOSED) { - throw new IgniteSQLException( - "The query was cancelled", - IgniteQueryErrorCode.QUERY_CANCELED, - new ExecutionCancelledException() - ); - } + if (state == QueryState.CLOSING || state == QueryState.CLOSED) + throw queryCanceledException(); fragments.add(f); } @@ -193,6 +188,15 @@ public class Query<RowT> { return cancel.isCanceled(); } + /** */ + protected IgniteSQLException queryCanceledException() { + return new IgniteSQLException( + "The query was cancelled", + IgniteQueryErrorCode.QUERY_CANCELED, + new QueryCancelledException() + ); + } + /** */ public void onNodeLeft(UUID nodeId) { if (initNodeId.equals(nodeId)) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java index c1f5b2d16df..22c1764a77b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryContext; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService; -import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node; import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode; @@ -174,12 +173,8 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery { */ public void mapping() { synchronized (mux) { - if (state == QueryState.CLOSED) { - throw new IgniteSQLException( - "The query was cancelled while executing.", - IgniteQueryErrorCode.QUERY_CANCELED - ); - } + if (state == QueryState.CLOSED) + throw queryCanceledException(); state = QueryState.MAPPING; } @@ -190,12 +185,8 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery { */ public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) { synchronized (mux) { - if (state == QueryState.CLOSED) { - throw new IgniteSQLException( - "The query was cancelled while executing.", - IgniteQueryErrorCode.QUERY_CANCELED - ); - } + if (state == QueryState.CLOSED) + throw queryCanceledException(); planningTime = U.currentTimeMillis() - startTs; @@ -278,7 +269,7 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery { log.warning("An exception occures during the query cancel", wrpEx); } finally { - super.tryClose(failure); + super.tryClose(failure == null && root != null ? root.failure() : failure); } } } @@ -287,18 +278,15 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery { @Override public void cancel() { cancel.cancel(); - tryClose(new ExecutionCancelledException()); + U.closeQuiet(root); + tryClose(queryCanceledException()); } /** */ public PlanningContext planningContext() { synchronized (mux) { - if (state == QueryState.CLOSED || state == QueryState.CLOSING) { - throw new IgniteSQLException( - "The query was cancelled while executing.", - IgniteQueryErrorCode.QUERY_CANCELED - ); - } + if (state == QueryState.CLOSED || state == QueryState.CLOSING) + throw queryCanceledException(); if (state == QueryState.EXECUTING || state == QueryState.MAPPING) { throw new IgniteSQLException( diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionCancelledException.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionCancelledException.java deleted file mode 100644 index fb4a8d8a9c4..00000000000 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionCancelledException.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.calcite.exec; - -import org.apache.ignite.IgniteCheckedException; - -/** */ -public class ExecutionCancelledException extends IgniteCheckedException { -} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 4d31c32ef66..92c0b70a10b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -34,6 +34,7 @@ import org.apache.calcite.tools.Frameworks; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryReadEvent; @@ -833,7 +834,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut Exception e = new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error()); - if (X.hasCause(msg.error(), ExecutionCancelledException.class)) { + if (X.hasCause(msg.error(), QueryCancelledException.class)) { e = new IgniteSQLException( "The query was cancelled while executing.", IgniteQueryErrorCode.QUERY_CANCELED, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java index 5a4006f42c5..19c2927cbd1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java @@ -22,8 +22,8 @@ import java.util.Comparator; import java.util.List; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -146,7 +146,7 @@ public abstract class AbstractNode<Row> implements Node<Row> { * @param e Exception. */ public void onError(Throwable e) { - if (e instanceof ExecutionCancelledException) + if (e instanceof QueryCancelledException) U.warn(context().logger(), "Execution is cancelled.", e); else onErrorInternal(e); @@ -184,7 +184,7 @@ public abstract class AbstractNode<Row> implements Node<Row> { /** */ protected void checkState() throws Exception { if (context().isCancelled()) - throw new ExecutionCancelledException(); + throw new QueryCancelledException(); if (Thread.interrupted()) throw new IgniteInterruptedCheckedException("Thread was interrupted."); if (!U.assertionsEnabled()) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java index 0ee802873bb..5449e19eb38 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java @@ -31,12 +31,14 @@ import java.util.function.Function; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG; @@ -114,8 +116,10 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, lock.lock(); try { - if (waiting != -1) - ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED)); + if (waiting != -1) { + ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED, + new QueryCancelledException())); + } closed = true; // an exception has to be set first to get right check order @@ -128,6 +132,11 @@ public class RootNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, onClose.accept(ex.get()); } + /** */ + public @Nullable Throwable failure() { + return ex.get(); + } + /** {@inheritDoc} */ @Override protected boolean isClosed() { return closed; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java index 178faf8cd5e..a1b038abfff 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java @@ -38,7 +38,7 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach private static final int CACHE_SIZE = 1024; /** */ - private GridInternalSubscriptionProcessor subscriptionProcessor; + private final GridInternalSubscriptionProcessor subscriptionProc; /** */ private volatile Map<CacheKey, QueryPlan> cache; @@ -50,21 +50,14 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach super(ctx); cache = new GridBoundedConcurrentLinkedHashMap<>(CACHE_SIZE); - subscriptionProcessor(ctx.internalSubscriptionProcessor()); + subscriptionProc = ctx.internalSubscriptionProcessor(); init(); } - /** - * @param subscriptionProcessor Subscription processor. - */ - public void subscriptionProcessor(GridInternalSubscriptionProcessor subscriptionProcessor) { - this.subscriptionProcessor = subscriptionProcessor; - } - /** {@inheritDoc} */ @Override public void init() { - subscriptionProcessor.registerSchemaChangeListener(new SchemaListener()); + subscriptionProc.registerSchemaChangeListener(new SchemaListener()); } /** {@inheritDoc} */ @@ -74,8 +67,6 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach /** {@inheritDoc} */ @Override public QueryPlan queryPlan(CacheKey key, Supplier<QueryPlan> planSupplier) { - Map<CacheKey, QueryPlan> cache = this.cache; - QueryPlan plan = cache.computeIfAbsent(key, k -> planSupplier.get()); return plan.copy(); @@ -83,7 +74,6 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach /** {@inheritDoc} */ @Override public QueryPlan queryPlan(CacheKey key) { - Map<CacheKey, QueryPlan> cache = this.cache; QueryPlan plan = cache.get(key); return plan != null ? plan.copy() : null; } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java index bd841718a15..b069eea3622 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; @@ -51,19 +52,20 @@ import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.spi.systemview.view.SqlQueryView; import org.apache.ignite.spi.systemview.view.SystemView; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.junit.Assert; import org.junit.Test; import static java.util.stream.Collectors.joining; import static org.apache.ignite.IgniteSystemProperties.getLong; import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_PLANNER_TIMEOUT; import static org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_VIEW; +import static org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME; /** * @@ -100,6 +102,9 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest */ @Test public void testCancelAtPlanningPhase() throws IgniteCheckedException { + MetricRegistry mreg = client.context().metric().registry(SQL_USER_QUERIES_REG_NAME); + mreg.reset(); + CalciteQueryProcessor engine = queryProcessor(client); int cnt = 9; @@ -111,7 +116,7 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql)); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> !engine.runningQueries().isEmpty() || fut.isDone(), TIMEOUT_IN_MS)); Collection<? extends Query<?>> running = engine.runningQueries(); @@ -123,15 +128,17 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest assertSame(qry, engine.runningQuery(qry.id())); // Waits for planning. - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS)); qry.cancel(); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0), IgniteSQLException.class, "The query was cancelled while planning"); + + assertEquals(1, ((LongMetric)mreg.findMetric("canceled")).value()); } /** @@ -140,6 +147,9 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest */ @Test public void testCancelAtExecutionPhase() throws Exception { + MetricRegistry mreg = client.context().metric().registry(SQL_USER_QUERIES_REG_NAME); + mreg.reset(); + CalciteQueryProcessor cliEngine = queryProcessor(client); CalciteQueryProcessor srvEngine = queryProcessor(srv); @@ -199,10 +209,10 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest qry.cancel(); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); } finally { @@ -210,7 +220,9 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest } GridTestUtils.assertThrowsAnyCause(log, - () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing."); + () -> fut.get(100), IgniteSQLException.class, "The query was cancelled"); + + assertEquals(1, ((LongMetric)mreg.findMetric("canceled")).value()); } /** @@ -220,6 +232,9 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest */ @Test public void testCancelByRemoteFragment() throws IgniteCheckedException { + MetricRegistry mreg = client.context().metric().registry(SQL_USER_QUERIES_REG_NAME); + mreg.reset(); + CalciteQueryProcessor clientEngine = queryProcessor(client); CalciteQueryProcessor serverEngine = queryProcessor(srv); int cnt = 6; @@ -236,7 +251,7 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql(sql)); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> { Collection<? extends Query<?>> queries = clientEngine.runningQueries(); @@ -244,7 +259,7 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest }, TIMEOUT_IN_MS)); - Assert.assertTrue(GridTestUtils.waitForCondition(() -> !serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); + assertTrue(GridTestUtils.waitForCondition(() -> !serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); Collection<? extends Query<?>> running = serverEngine.runningQueries(); Query<?> qry = F.first(running); @@ -253,13 +268,15 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest qry.cancel(); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> clientEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); - Assert.assertTrue(GridTestUtils.waitForCondition( + assertTrue(GridTestUtils.waitForCondition( () -> serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS)); - GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing."); + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled"); + + assertEquals(1, ((LongMetric)mreg.findMetric("canceled")).value()); } /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java index 63294238a3b..2bb661e01d1 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java @@ -18,6 +18,10 @@ package org.apache.ignite.internal.processors.query.calcite.integration; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.Statement; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -29,10 +33,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.SqlConfiguration; import org.apache.ignite.events.CacheQueryExecutedEvent; @@ -40,12 +46,15 @@ import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.events.SqlQueryExecutionEvent; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest; +import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.Query; import org.apache.ignite.internal.processors.query.calcite.QueryRegistry; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.LogListener; @@ -58,15 +67,20 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir; import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics; import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.stopCollectStatisticsAndRead; +import static org.apache.ignite.internal.processors.query.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME; import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.BIG_RESULT_SET_MSG; import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_ERROR_MSG; import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG; import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG; +import static org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME; /** * Test SQL diagnostic tools. */ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { + /** */ + private static final String jdbcUrl = "jdbc:ignite:thin://127.0.0.1:" + ClientConnectorConfiguration.DFLT_PORT; + /** */ private static final long LONG_QRY_TIMEOUT = 1_000L; @@ -114,6 +128,155 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { return 2; } + /** */ + @Test + public void testParserMetrics() { + MetricRegistry mreg0 = grid(0).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME); + MetricRegistry mreg1 = grid(1).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME); + mreg0.reset(); + mreg1.reset(); + + LongMetric hits0 = mreg0.findMetric("hits"); + LongMetric hits1 = mreg1.findMetric("hits"); + LongMetric misses0 = mreg0.findMetric("misses"); + LongMetric misses1 = mreg1.findMetric("misses"); + + // Parse and plan on client. + sql("CREATE TABLE test_parse(a INT)"); + + assertEquals(0, hits0.value()); + assertEquals(0, hits1.value()); + assertEquals(0, misses0.value()); + assertEquals(0, misses1.value()); + + for (int i = 0; i < 10; i++) + sql(grid(0), "INSERT INTO test_parse VALUES (?)", i); + + assertEquals(9, hits0.value()); + assertEquals(0, hits1.value()); + assertEquals(1, misses0.value()); + assertEquals(0, misses1.value()); + + for (int i = 0; i < 10; i++) + sql(grid(1), "SELECT * FROM test_parse WHERE a = ?", i); + + assertEquals(9, hits0.value()); + assertEquals(9, hits1.value()); + assertEquals(1, misses0.value()); + assertEquals(1, misses1.value()); + } + + /** */ + @Test + public void testBatchParserMetrics() throws Exception { + MetricRegistry mreg0 = grid(0).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME); + MetricRegistry mreg1 = grid(1).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME); + mreg0.reset(); + mreg1.reset(); + + LongMetric hits0 = mreg0.findMetric("hits"); + LongMetric hits1 = mreg1.findMetric("hits"); + LongMetric misses0 = mreg0.findMetric("misses"); + LongMetric misses1 = mreg1.findMetric("misses"); + + sql("CREATE TABLE test_batch(a INT)"); + + assertEquals(0, hits0.value()); + assertEquals(0, hits1.value()); + assertEquals(0, misses0.value()); + assertEquals(0, misses1.value()); + + try (Connection conn = DriverManager.getConnection(jdbcUrl)) { + conn.setSchema("PUBLIC"); + + try (Statement stmt = conn.createStatement()) { + for (int i = 0; i < 10; i++) + stmt.addBatch(String.format("INSERT INTO test_batch VALUES (%d)", i)); + + stmt.executeBatch(); + + assertEquals(0, hits0.value()); + assertEquals(0, hits1.value()); + assertEquals(10, misses0.value()); + assertEquals(0, misses1.value()); + } + + String sql = "INSERT INTO test_batch VALUES (?)"; + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + for (int i = 10; i < 20; i++) { + stmt.setInt(1, i); + stmt.addBatch(); + } + + stmt.executeBatch(); + + assertEquals(0, hits0.value()); + assertEquals(0, hits1.value()); + assertEquals(11, misses0.value()); // Only one increment per batch. + assertEquals(0, misses1.value()); + } + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + for (int i = 20; i < 30; i++) { + stmt.setInt(1, i); + stmt.addBatch(); + } + + stmt.executeBatch(); + + assertEquals(1, hits0.value()); // Only one increment per batch. + assertEquals(0, hits1.value()); + assertEquals(11, misses0.value()); + assertEquals(0, misses1.value()); + } + } + } + + /** */ + @Test + public void testUserQueriesMetrics() throws Exception { + sql(grid(0), "CREATE TABLE test_metric (a INT)"); + + MetricRegistry mreg0 = grid(0).context().metric().registry(SQL_USER_QUERIES_REG_NAME); + MetricRegistry mreg1 = grid(1).context().metric().registry(SQL_USER_QUERIES_REG_NAME); + mreg0.reset(); + mreg1.reset(); + + AtomicInteger qryCnt = new AtomicInteger(); + grid(0).context().query().runningQueryManager().registerQueryFinishedListener(q -> qryCnt.incrementAndGet()); + + sql(grid(0), "INSERT INTO test_metric VALUES (?)", 0); + sql(grid(0), "SELECT * FROM test_metric WHERE a = ?", 0); + + try { + sql(grid(0), "SELECT * FROM test_fail"); + + fail(); + } + catch (IgniteSQLException ignored) { + // Expected. + } + + FieldsQueryCursor<?> cur = grid(0).getOrCreateCache("test_metric") + .query(new SqlFieldsQuery("SELECT * FROM table(system_range(1, 10000))")); + + assertTrue(cur.iterator().hasNext()); + + cur.close(); + + // Query unregistering is async process, wait for it before metrics check. + assertTrue(GridTestUtils.waitForCondition(() -> qryCnt.get() == 4, 1_000L)); + + assertEquals(2, ((LongMetric)mreg0.findMetric("success")).value()); + assertEquals(2, ((LongMetric)mreg0.findMetric("failed")).value()); // 1 error + 1 cancelled. + assertEquals(1, ((LongMetric)mreg0.findMetric("canceled")).value()); + + assertEquals(0, ((LongMetric)mreg1.findMetric("success")).value()); + assertEquals(0, ((LongMetric)mreg1.findMetric("failed")).value()); + assertEquals(0, ((LongMetric)mreg1.findMetric("canceled")).value()); + } + /** */ @Test public void testPerformanceStatistics() throws Exception { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryParserMetricsHolder.java similarity index 94% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolder.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryParserMetricsHolder.java index 73efd43d618..157620c3af7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryParserMetricsHolder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2; +package org.apache.ignite.internal.processors.query; import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.metric.MetricRegistry; @@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; */ public class QueryParserMetricsHolder { /** Query parser metric group name. */ - static final String QUERY_PARSER_METRIC_GROUP_NAME = "sql.parser.cache"; + public static final String QUERY_PARSER_METRIC_GROUP_NAME = "sql.parser.cache"; /** Query cache hits counter. */ private final LongAdderMetric qryCacheHits; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java index 952e33c58bf..e53ea65b477 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.NestedTxMode; +import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java index 3695cec0594..a6f53e686f8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java @@ -21,12 +21,13 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder; import org.apache.ignite.spi.metric.LongMetric; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; import org.junit.Test; -import static org.apache.ignite.internal.processors.query.h2.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME; +import static org.apache.ignite.internal.processors.query.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME; /** * Test to check {@link QueryParserMetricsHolder}