This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch cost_analyze in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b77f71bdf0013f67fa5f542124063744169cb6aa Author: Beyyes <[email protected]> AuthorDate: Fri Nov 22 19:02:08 2024 +0800 perfect query statistics --- .../iotdb/db/queryengine/plan/Coordinator.java | 13 +++ .../db/queryengine/plan/analyze/Analyzer.java | 9 +- .../plan/execution/IQueryExecution.java | 6 ++ .../queryengine/plan/execution/QueryExecution.java | 31 ++++--- .../plan/execution/config/ConfigExecution.java | 11 +++ .../db/queryengine/plan/planner/IPlanner.java | 2 + .../queryengine/plan/planner/TreeModelPlanner.java | 8 +- .../plan/relational/analyzer/Analyzer.java | 7 +- .../relational/planner/TableLogicalPlanner.java | 15 +++- .../plan/relational/planner/TableModelPlanner.java | 8 +- .../distribute/TableDistributedPlanner.java | 5 +- .../optimizations/PushPredicateIntoTableScan.java | 16 ++-- .../plan/scheduler/ClusterScheduler.java | 5 -- .../scheduler/FragmentInstanceDispatcherImpl.java | 7 -- .../FixedScheduledOutputQueryPlanStatistics.java | 97 ++++++++++++++++++++++ .../operator/MergeTreeSortOperatorTest.java | 12 +++ 16 files changed, 207 insertions(+), 45 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 61be1471b15..0999c6831a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -85,6 +85,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.statistics.FixedScheduledOutputQueryPlanStatistics; import org.apache.iotdb.db.utils.SetThreadName; import org.slf4j.Logger; @@ -140,6 +141,9 @@ public class Coordinator { private final List<PlanOptimizer> logicalPlanOptimizers; private final List<PlanOptimizer> distributionPlanOptimizers; + FixedScheduledOutputQueryPlanStatistics fixedScheduledOutputQueryPlanStatistics = + new FixedScheduledOutputQueryPlanStatistics(); + private Coordinator() { this.queryExecutionMap = new ConcurrentHashMap<>(); this.executor = getQueryExecutor(); @@ -443,6 +447,15 @@ public class Coordinator { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { LOGGER.debug("[CleanUpQuery]]"); queryExecution.stopAndCleanup(t); + + // TODO(beyyes) add fe statistic output + IQueryExecution queryExecution1 = queryExecutionMap.get(queryId); + if (queryExecution1.getPlanner() != null + && queryExecution1.getPlanner().isQueryStatement()) { + MPPQueryContext queryContext = queryExecution1.getQueryContext(); + fixedScheduledOutputQueryPlanStatistics.recordCost(queryContext); + } + queryExecutionMap.remove(queryId); if (queryExecution.isQuery()) { long costTime = queryExecution.getTotalExecutionTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java index 238620b6f48..6b2739034d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java @@ -52,8 +52,13 @@ public class Analyzer { } if (statement.isQuery()) { - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(TREE_TYPE, ANALYZER, System.nanoTime() - startTime); + long cost = + System.nanoTime() + - startTime + - context.getFetchSchemaCost() + - context.getFetchPartitionCost(); + QueryPlanCostMetricSet.getInstance().recordPlanCost(TREE_TYPE, ANALYZER, cost); + context.setAnalyzeCost(cost); } return analysis; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index b35123e8f70..d2517c02b57 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -20,7 +20,9 @@ package org.apache.iotdb.db.queryengine.plan.execution; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; +import org.apache.iotdb.db.queryengine.plan.planner.IPlanner; import org.apache.tsfile.read.common.block.TsBlock; @@ -67,4 +69,8 @@ public interface IQueryExecution { Optional<String> getExecuteSQL(); String getStatementType(); + + MPPQueryContext getQueryContext(); + + IPlanner getPlanner(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 4ecddf1a80e..c9fd577fad3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -267,16 +267,17 @@ public class QueryExecution implements IQueryExecution { // Analyze the statement in QueryContext. Generate the analysis this query need private IAnalysis analyze(MPPQueryContext context) { - final long startTime = System.nanoTime(); - IAnalysis result; - try { - result = planner.analyze(context); - } finally { - long analyzeCost = System.nanoTime() - startTime; - context.setAnalyzeCost(analyzeCost); - PERFORMANCE_OVERVIEW_METRICS.recordAnalyzeCost(analyzeCost); - } - return result; + return planner.analyze(context); + // final long startTime = System.nanoTime(); + // IAnalysis result; + // try { + // result = planner.analyze(context); + // } finally { + // long analyzeCost = System.nanoTime() - startTime; + // context.setAnalyzeCost(analyzeCost); + // PERFORMANCE_OVERVIEW_METRICS.recordAnalyzeCost(analyzeCost); + // } + // return result; } private void schedule() { @@ -693,6 +694,16 @@ public class QueryExecution implements IQueryExecution { return analysis.getStatementType(); } + @Override + public MPPQueryContext getQueryContext() { + return context; + } + + @Override + public IPlanner getPlanner() { + return planner; + } + public MPPQueryContext getContext() { return context; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index c0886d91842..9cd94461117 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.planner.IPlanner; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; @@ -248,4 +249,14 @@ public class ConfigExecution implements IQueryExecution { public String getStatementType() { return statementType.name(); } + + @Override + public MPPQueryContext getQueryContext() { + return context; + } + + @Override + public IPlanner getPlanner() { + return null; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java index 616cca3efb7..c53294b1a1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java @@ -51,4 +51,6 @@ public interface IPlanner { void setRedirectInfo( IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode); + + boolean isQueryStatement(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index a819fa950de..2e0289c0781 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -134,8 +135,6 @@ public class TreeModelPlanner implements IPlanner { stateMachine, distributedPlan.getInstances(), context.getQueryType(), - executor, - writeOperationExecutor, scheduledExecutor, syncInternalServiceClientManager, asyncInternalServiceClientManager); @@ -201,4 +200,9 @@ public class TreeModelPlanner implements IPlanner { } } } + + @Override + public boolean isQueryStatement() { + return statement instanceof QueryStatement; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java index 8d25c7a1e85..978e2b0682d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java @@ -67,6 +67,7 @@ public class Analyzer { } public Analysis analyze(Statement statement) { + long startTime = System.nanoTime(); Analysis analysis = new Analysis(statement, parameterLookup); Statement innerStatement = statement instanceof PipeEnriched @@ -85,15 +86,15 @@ public class Analyzer { analysis.setDatabaseName(session.getDatabaseName().get()); } - long startTime = System.nanoTime(); StatementAnalyzer analyzer = statementAnalyzerFactory.createStatementAnalyzer( analysis, context, session, warningCollector, CorrelationSupport.ALLOWED); analyzer.analyze(statement); if (statement instanceof Query) { - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(TABLE_TYPE, ANALYZER, System.nanoTime() - startTime); + long cost = System.nanoTime() - startTime; + QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, ANALYZER, cost); + context.setAnalyzeCost(cost); } // TODO access control diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java index 2692690c82f..bf7af33c341 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java @@ -124,8 +124,9 @@ public class TableLogicalPlanner { PlanNode planNode = planStatement(analysis, statement); if (statement instanceof Query) { - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(TABLE_TYPE, LOGICAL_PLANNER, System.nanoTime() - startTime); + long cost = System.nanoTime() - startTime; + QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, LOGICAL_PLANNER, cost); + queryContext.setLogicalPlanCost(cost); startTime = System.nanoTime(); for (PlanOptimizer optimizer : planOptimizers) { @@ -142,8 +143,14 @@ public class TableLogicalPlanner { warningCollector, PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector())); } - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(TABLE_TYPE, LOGICAL_PLAN_OPTIMIZE, System.nanoTime() - startTime); + + cost = + System.nanoTime() + - startTime + - queryContext.getFetchSchemaCost() + - queryContext.getFetchPartitionCost(); + QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, LOGICAL_PLAN_OPTIMIZE, cost); + queryContext.setLogicalOptimizationCost(cost); } return new LogicalQueryPlan(queryContext, planNode); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index a7f0878041e..41d456d9ab5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Pla import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; @@ -167,8 +168,6 @@ public class TableModelPlanner implements IPlanner { stateMachine, distributedPlan.getInstances(), context.getQueryType(), - executor, - writeOperationExecutor, scheduledExecutor, syncInternalServiceClientManager, asyncInternalServiceClientManager); @@ -228,5 +227,10 @@ public class TableModelPlanner implements IPlanner { } } + @Override + public boolean isQueryStatement() { + return statement instanceof Query; + } + public static class NopAccessControl implements AccessControl {} } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java index a485f3814f4..592b93d87fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java @@ -104,8 +104,9 @@ public class TableDistributedPlanner { DistributedQueryPlan resultDistributedPlan = generateDistributedPlan(outputNodeWithExchange); if (analysis.getStatement() instanceof Query) { - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(TABLE_TYPE, DISTRIBUTION_PLANNER, System.nanoTime() - startTime); + long cost = System.nanoTime() - startTime; + QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, DISTRIBUTION_PLANNER, cost); + mppQueryContext.setDistributionPlanCost(cost); } return resultDistributedPlan; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index f9d2d5edb1a..4baa98c6796 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -303,7 +303,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { // no predicate, just scan all matched deviceEntries if (TRUE_LITERAL.equals(context.inheritedPredicate)) { - getDeviceEntriesWithDataPartitions(tableScanNode, Collections.emptyList(), null); + getDeviceEntriesWithDataPartitions(tableScanNode, Collections.emptyList()); return tableScanNode; } @@ -342,10 +342,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } // do index scan after expressionCanPushDown is processed - getDeviceEntriesWithDataPartitions( - tableScanNode, - splitExpression.getMetadataExpressions(), - splitExpression.getTimeColumnName()); + getDeviceEntriesWithDataPartitions(tableScanNode, splitExpression.getMetadataExpressions()); // exist expressions can not push down to scan operator if (!splitExpression.getExpressionsCannotPushDown().isEmpty()) { @@ -413,7 +410,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { } private void getDeviceEntriesWithDataPartitions( - TableScanNode tableScanNode, List<Expression> metadataExpressions, String timeColumnName) { + TableScanNode tableScanNode, List<Expression> metadataExpressions) { List<String> attributeColumns = new ArrayList<>(); int attributeIndex = 0; @@ -439,8 +436,9 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { attributeColumns, queryContext); tableScanNode.setDeviceEntries(deviceEntries); - QueryPlanCostMetricSet.getInstance() - .recordPlanCost(TABLE_TYPE, SCHEMA_FETCHER, System.nanoTime() - startTime); + long cost = System.nanoTime() - startTime; + QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, SCHEMA_FETCHER, cost); + queryContext.setFetchSchemaCost(cost); if (deviceEntries.isEmpty()) { if (analysis.noAggregates()) { @@ -478,8 +476,10 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { analysis.upsertDataPartition(dataPartition); } + cost = System.nanoTime() - startTime; QueryPlanCostMetricSet.getInstance() .recordPlanCost(TABLE_TYPE, PARTITION_FETCHER, System.nanoTime() - startTime); + queryContext.setFetchPartitionCost(cost); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java index 448f0829b5a..4835321aeda 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -71,8 +70,6 @@ public class ClusterScheduler implements IScheduler { QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType, - ExecutorService executor, - ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> @@ -84,8 +81,6 @@ public class ClusterScheduler implements IScheduler { new FragmentInstanceDispatcherImpl( queryType, queryContext, - executor, - writeOperationExecutor, syncInternalServiceClientManager, asyncInternalServiceClientManager); if (queryType == QueryType.READ) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 6bdc5c20f64..0ce548351e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -63,7 +63,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -77,8 +76,6 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final QueryType type; private final MPPQueryContext queryContext; private final String localhostIpAddr; @@ -100,15 +97,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { public FragmentInstanceDispatcherImpl( QueryType type, MPPQueryContext queryContext, - ExecutorService executor, - ExecutorService writeOperationExecutor, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncInternalServiceClientManager) { this.type = type; this.queryContext = queryContext; - this.executor = executor; - this.writeOperationExecutor = writeOperationExecutor; this.syncInternalServiceClientManager = syncInternalServiceClientManager; this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FixedScheduledOutputQueryPlanStatistics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FixedScheduledOutputQueryPlanStatistics.java new file mode 100644 index 00000000000..7188fc24a8b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FixedScheduledOutputQueryPlanStatistics.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.statistics; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class FixedScheduledOutputQueryPlanStatistics { + private static final Logger LOGGER = + LoggerFactory.getLogger(FixedScheduledOutputQueryPlanStatistics.class); + + AtomicLong analyzeCost = new AtomicLong(0); + AtomicLong fetchPartitionCost = new AtomicLong(0); + AtomicLong fetchSchemaCost = new AtomicLong(0); + AtomicLong logicalPlanCost = new AtomicLong(0); + AtomicLong logicalOptimizationCost = new AtomicLong(0); + AtomicLong distributionPlanCost = new AtomicLong(0); + AtomicLong dispatchCost = new AtomicLong(0); + AtomicLong num = new AtomicLong(0); + + public FixedScheduledOutputQueryPlanStatistics() { + ScheduledExecutorService scheduledExecutor = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + "FixedScheduledOutputQueryPlanStatistics"); + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + scheduledExecutor, this::output, 0, 60_000, TimeUnit.MILLISECONDS); + } + + private synchronized void output() { + long count = num.get(); + if (count == 0) { + return; + } + + LOGGER.info( + "\r\n======ScheduledOutputQueryPlanStatistics, num: {}, avg analyzeCost: {}, " + + "avg fetchPartitionCost: {}, avg fetchSchemaCost: {}, avg logicalPlanCost: {}, " + + "avg logicalOptimizationCost: {}, avg distributionPlanCost: {}, avg dispatchCost: {}", + num.get(), + format(analyzeCost, count), + format(fetchPartitionCost, count), + format(fetchPartitionCost, count), + format(logicalPlanCost, count), + format(logicalOptimizationCost, count), + format(distributionPlanCost, count), + format(dispatchCost, count)); + num.set(0); + analyzeCost.set(0); + fetchPartitionCost.set(0); + fetchSchemaCost.set(0); + logicalPlanCost.set(0); + logicalOptimizationCost.set(0); + distributionPlanCost.set(0); + dispatchCost.set(0); + } + + private String format(AtomicLong time, long count) { + return time.get() / count + "ns"; + } + + public synchronized void recordCost(MPPQueryContext queryContext) { + num.incrementAndGet(); + + analyzeCost.getAndAdd(queryContext.getAnalyzeCost()); + fetchPartitionCost.getAndAdd(queryContext.getFetchPartitionCost()); + fetchSchemaCost.getAndAdd(queryContext.getFetchSchemaCost()); + logicalPlanCost.getAndAdd(queryContext.getLogicalPlanCost()); + logicalOptimizationCost.getAndAdd(queryContext.getLogicalOptimizationCost()); + distributionPlanCost.getAndAdd(queryContext.getDistributionPlanCost()); + dispatchCost.getAndAdd(queryContext.getDispatchCost()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java index 8f89b41b92a..be6cb2a5b88 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; @@ -44,6 +45,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.ShowQueriesOper import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; +import org.apache.iotdb.db.queryengine.plan.planner.IPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; @@ -1838,6 +1840,16 @@ public class MergeTreeSortOperatorTest { return null; } + @Override + public MPPQueryContext getQueryContext() { + return null; + } + + @Override + public IPlanner getPlanner() { + return null; + } + @Override public void start() {}
