This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/write_opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/write_opt by this
push:
new a20bc0fe5a add stepTracker
a20bc0fe5a is described below
commit a20bc0fe5a39e6c1928162c45f53d69131c6d4a0
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Jun 20 15:48:55 2022 +0800
add stepTracker
---
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 6 +-
.../org/apache/iotdb/db/mpp/plan/StepTracker.java | 73 ++++++++++++++++++++
.../db/mpp/plan/execution/QueryExecution.java | 77 ++++++++++++----------
.../planner/distribution/DistributionPlanner.java | 3 -
.../db/mpp/plan/scheduler/ClusterScheduler.java | 4 +-
5 files changed, 120 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 111a9222a6..610b586ceb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -112,9 +112,9 @@ public class Coordinator {
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
- if (sql != null) {
- LOGGER.info("start executing sql: {}", sql);
- }
+ // if (sql != null) {
+ // LOGGER.info("start executing sql: {}", sql);
+ // }
IQueryExecution execution =
createQueryExecution(
statement,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
new file mode 100644
index 0000000000..1fe7df7be0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class Metric {
+ private static final Logger logger = LoggerFactory.getLogger(Metric.class);
+ public String stepName;
+ public long invokeCount;
+ public long totalTime;
+ public long last100Time;
+
+ public Metric(String stepName) {
+ this.stepName = stepName;
+ this.invokeCount = 0;
+ this.totalTime = 0;
+ this.last100Time = 0;
+ }
+
+ public void trace(long startTime, long endTime) {
+ this.invokeCount++;
+ this.totalTime += (endTime - startTime);
+ }
+
+ public void tryPrint() {
+ if (invokeCount % 100 == 0) {
+ logger.info(
+ String.format(
+ "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms,
Last100AVG: %fms",
+ Thread.currentThread().getId(),
+ stepName,
+ invokeCount,
+ totalTime * 1.0 / 1000000,
+ totalTime * 1.0 / 1000000 / invokeCount,
+ (totalTime * 1.0 - last100Time) / 1000000 / 100));
+ last100Time = totalTime;
+ }
+ }
+}
+
+public class StepTracker {
+ private static final ThreadLocal<Map<String, Metric>> metrics = new
ThreadLocal<>();
+
+ public static void trace(String stepName, long startTime, long endTime) {
+ if (metrics.get() == null) {
+ metrics.set(new HashMap<>());
+ }
+ metrics.get().computeIfAbsent(stepName, Metric::new).trace(startTime,
endTime);
+ metrics.get().get(stepName).tryPrint();
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 5f4ce1ae3f..48f018b19c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
+import org.apache.iotdb.db.mpp.plan.StepTracker;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -46,7 +47,6 @@ import
org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
@@ -189,52 +189,59 @@ public class QueryExecution implements IQueryExecution {
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
// initialize the variable `analysis`
- logger.info("start to analyze query");
+ // logger.info("start to analyze query");
return new Analyzer(context, partitionFetcher,
schemaFetcher).analyze(statement);
}
private void schedule() {
- // TODO: (xingtanzjr) initialize the query scheduler according to
configuration
- this.scheduler =
- config.isClusterMode()
- ? new ClusterScheduler(
- context,
- stateMachine,
- distributedPlan.getInstances(),
- context.getQueryType(),
- executor,
- writeOperationExecutor,
- scheduledExecutor,
- internalServiceClientManager)
- : new StandaloneScheduler(
- context,
- stateMachine,
- distributedPlan.getInstances(),
- context.getQueryType(),
- executor,
- scheduledExecutor,
- internalServiceClientManager);
- this.scheduler.start();
+ long startTime = System.nanoTime();
+ try {
+ // TODO: (xingtanzjr) initialize the query scheduler according to
configuration
+ this.scheduler =
+ config.isClusterMode()
+ ? new ClusterScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ writeOperationExecutor,
+ scheduledExecutor,
+ internalServiceClientManager)
+ : new StandaloneScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ scheduledExecutor,
+ internalServiceClientManager);
+ this.scheduler.start();
+ } finally {
+ StepTracker.trace("dispatch", startTime, System.nanoTime());
+ }
}
// Use LogicalPlanner to do the logical query plan and logical optimization
public void doLogicalPlan() {
- logger.info("do logical plan...");
- LogicalPlanner planner = new LogicalPlanner(this.context,
this.planOptimizers);
- this.logicalPlan = planner.plan(this.analysis);
- logger.info(
- "logical plan is: \n {}",
PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
+ long startTime = System.nanoTime();
+ try {
+ LogicalPlanner planner = new LogicalPlanner(this.context,
this.planOptimizers);
+ this.logicalPlan = planner.plan(this.analysis);
+ } finally {
+ StepTracker.trace("doLogicalPlan", startTime, System.nanoTime());
+ }
}
// Generate the distributed plan and split it into fragments
public void doDistributedPlan() {
- logger.info("do distribution plan...");
- DistributionPlanner planner = new DistributionPlanner(this.analysis,
this.logicalPlan);
- this.distributedPlan = planner.planFragments();
- logger.info(
- "distribution plan done. Fragment instance count is {}, details is: \n
{}",
- distributedPlan.getInstances().size(),
- printFragmentInstances(distributedPlan.getInstances()));
+ long startTime = System.nanoTime();
+ try {
+ DistributionPlanner planner = new DistributionPlanner(this.analysis,
this.logicalPlan);
+ this.distributedPlan = planner.planFragments();
+ } finally {
+ StepTracker.trace("doDistributionPlan", startTime, System.nanoTime());
+ }
}
private String printFragmentInstances(List<FragmentInstance> instances) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 421de275da..4dfe1b968f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -67,9 +66,7 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
- System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
- System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
if (analysis.getStatement() instanceof QueryStatement) {
analysis
.getRespDatasetHeader()
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 9437c22536..8390ac6fe5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -86,7 +86,7 @@ public class ClusterScheduler implements IScheduler {
@Override
public void start() {
stateMachine.transitionToDispatching();
- logger.info("transit to DISPATCHING");
+ // logger.info("transit to DISPATCHING");
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatch(instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to
consensus redirect.
@@ -116,7 +116,7 @@ public class ClusterScheduler implements IScheduler {
// The FragmentInstances has been dispatched successfully to corresponding
host, we mark the
// QueryState to Running
stateMachine.transitionToRunning();
- logger.info("transit to RUNNING");
+ // logger.info("transit to RUNNING");
instances.forEach(
instance -> {
stateMachine.initialFragInstanceState(instance.getId(),
FragmentInstanceState.RUNNING);