This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/query_log in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e2e19a7f057c53d7bb65bc1acba07c1f23b928ec Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed May 4 21:10:30 2022 +0800 tmp saved --- .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 11 ++++ .../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 5 +- .../db/mpp/plan/execution/QueryExecution.java | 35 ++++++++++--- .../plan/node/metedata/read/SchemaFetchNode.java | 4 ++ .../node/metedata/read/SeriesSchemaMergeNode.java | 4 ++ .../db/mpp/plan/scheduler/ClusterScheduler.java | 11 +++- .../scheduler/SimpleFragInstanceDispatcher.java | 58 ++++++++++------------ 7 files changed, 88 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index 0b282f5df3..5e20cd729d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java @@ -68,6 +68,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -80,6 +83,8 @@ import java.util.stream.Collectors; /** Analyze the statement and generate Analysis. */ public class Analyzer { + private static final Logger logger = LoggerFactory.getLogger(Analyzer.class); + private final MPPQueryContext context; private final IPartitionFetcher partitionFetcher; @@ -96,6 +101,10 @@ public class Analyzer { return new AnalyzeVisitor().process(statement, context); } + private String getLogHeader() { + return String.format("Query[%s]:", context.getQueryId()); + } + /** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */ private final class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> { @@ -118,7 +127,9 @@ public class Analyzer { (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, patternTree); // request schema fetch API + logger.info("{} fetch query schema...", getLogHeader()); SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); + logger.info("{} fetch schema done", getLogHeader()); // (xingtanzjr) If there is no leaf node in the schema tree, the query should be completed // immediately if (schemaTree.isEmpty()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java index 256b53779e..d0c86da11b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java @@ -75,7 +75,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { coordinator.execute(schemaFetchStatement, queryId, null, "", partitionFetcher, this); // TODO: (xingtanzjr) throw exception if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new RuntimeException("cannot fetch schema, status is: " + executionResult.status); + throw new RuntimeException( + String.format( + "cannot fetch schema, status is: %s, msg is: %s", + executionResult.status.getCode(), executionResult.status.getMessage())); } SchemaTree result = new SchemaTree(); while (coordinator.getQueryExecution(queryId).hasNextResult()) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index dce5dccc71..975b7d4e16 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.execution.QueryState; import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService; import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle; -import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.analyze.Analyzer; import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; @@ -40,6 +39,7 @@ import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner; import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner; import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler; import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler; import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler; @@ -70,7 +70,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked; * corresponding physical nodes. 3. Collect and monitor the progress/states of this query. */ public class QueryExecution implements IQueryExecution { - private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); + private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class); private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -136,6 +136,9 @@ public class QueryExecution implements IQueryExecution { public void start() { if (skipExecute()) { + logger.info( + "{} execution of query will be skipped. Transit to FINISHED immediately.", + getLogHeader()); stateMachine.transitionToFinished(); return; } @@ -152,12 +155,13 @@ public class QueryExecution implements IQueryExecution { } // Analyze the statement in QueryContext. Generate the analysis this query need - private static Analysis analyze( + private Analysis analyze( Statement statement, MPPQueryContext context, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { // initialize the variable `analysis` + logger.info("{} start to analyze query", getLogHeader()); return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement); } @@ -186,14 +190,25 @@ public class QueryExecution implements IQueryExecution { // Use LogicalPlanner to do the logical query plan and logical optimization public void doLogicalPlan() { + logger.info("{} do logical plan...", getLogHeader()); LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers); this.logicalPlan = planner.plan(this.analysis); + logger.info( + "{} logical plan is: \n {}", + getLogHeader(), + PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode())); } // Generate the distributed plan and split it into fragments public void doDistributedPlan() { + logger.info("{} do distribution plan...", getLogHeader()); DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan); this.distributedPlan = planner.planFragments(); + logger.info( + "{} distribution plan done. Fragment instance count is {}, details is: \n {}", + getLogHeader(), + distributedPlan.getInstances().size(), + distributedPlan.getInstances()); } // Stop the workers for this query @@ -294,12 +309,16 @@ public class QueryExecution implements IQueryExecution { state == QueryState.FINISHED || state == QueryState.RUNNING ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR; - return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode)); + return new ExecutionResult( + context.getQueryId(), RpcUtils.getStatus(statusCode, stateMachine.getFailureMessage())); } catch (InterruptedException | ExecutionException e) { // TODO: (xingtanzjr) use more accurate error handling - Thread.currentThread().interrupt(); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } return new ExecutionResult( - context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + context.getQueryId(), + RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, stateMachine.getFailureMessage())); } } @@ -333,4 +352,8 @@ public class QueryExecution implements IQueryExecution { public String toString() { return String.format("QueryExecution[%s]", context.getQueryId()); } + + private String getLogHeader() { + return String.format("Query[%s]:", context.getQueryId()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java index a43058adc4..fa62745926 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java @@ -75,4 +75,8 @@ public class SchemaFetchNode extends SchemaScanNode { public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitSchemaFetch(this, context); } + + public String toString() { + return String.format("SchemaFetchNode-%s", getPlanNodeId()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java index a7c4f4052e..c501dc0384 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java @@ -57,4 +57,8 @@ public class SeriesSchemaMergeNode extends AbstractSchemaMergeNode { public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitSchemaMerge(this, context); } + + public String toString() { + return String.format("SchemaMergeNode-%s", getPlanNodeId()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java index 48c4268034..db4bcfdb84 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java @@ -48,7 +48,7 @@ import java.util.concurrent.ScheduledExecutorService; * this scheduler. */ public class ClusterScheduler implements IScheduler { - private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class); + private static final Logger logger = LoggerFactory.getLogger(ClusterScheduler.class); private MPPQueryContext queryContext; // The stateMachine of the QueryExecution owned by this QueryScheduler @@ -90,6 +90,7 @@ public class ClusterScheduler implements IScheduler { @Override public void start() { stateMachine.transitionToDispatching(); + logger.info("{} transit to DISPATCHING", getLogHeader()); Future<FragInstanceDispatchResult> dispatchResultFuture = dispatcher.dispatch(instances); // NOTICE: the FragmentInstance may be dispatched to another Host due to consensus redirect. @@ -102,7 +103,9 @@ public class ClusterScheduler implements IScheduler { } } catch (InterruptedException | ExecutionException e) { // If the dispatch failed, we make the QueryState as failed, and return. - Thread.currentThread().interrupt(); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } stateMachine.transitionToFailed(e); return; } @@ -156,4 +159,8 @@ public class ClusterScheduler implements IScheduler { // After sending, start to collect the states of these fragment instances private void startMonitorInstances() {} + + private String getLogHeader() { + return String.format("Query[%s]", queryContext.getQueryId()); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java index fec10957bf..4c64c3b7b6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java @@ -19,20 +19,14 @@ package org.apache.iotdb.db.mpp.plan.scheduler; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; -import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance; -import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -55,31 +49,33 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher { public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> instances) { return executor.submit( () -> { - TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false); - for (FragmentInstance instance : instances) { - TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint(); - // TODO: (jackie tien) change the port - try (SyncDataNodeInternalServiceClient client = - internalServiceClientManager.borrowClient(endPoint)) { - // TODO: (xingtanzjr) consider how to handle the buffer here - ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); - instance.serializeRequest(buffer); - buffer.flip(); - TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId(); - TSendFragmentInstanceReq req = - new TSendFragmentInstanceReq( - new TFragmentInstance(buffer), groupId, instance.getType().toString()); - resp = client.sendFragmentInstance(req); - } catch (IOException e) { - LOGGER.error("can't connect to node {}", endPoint, e); - throw e; - } - - if (!resp.accepted) { - break; - } - } - return new FragInstanceDispatchResult(resp.accepted); + throw new RuntimeException("Dispatch Error"); + // TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false); + // for (FragmentInstance instance : instances) { + // TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint(); + // // TODO: (jackie tien) change the port + // try (SyncDataNodeInternalServiceClient client = + // internalServiceClientManager.borrowClient(endPoint)) { + // // TODO: (xingtanzjr) consider how to handle the buffer here + // ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); + // instance.serializeRequest(buffer); + // buffer.flip(); + // TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId(); + // TSendFragmentInstanceReq req = + // new TSendFragmentInstanceReq( + // new TFragmentInstance(buffer), groupId, + // instance.getType().toString()); + // resp = client.sendFragmentInstance(req); + // } catch (IOException e) { + // LOGGER.error("can't connect to node {}", endPoint, e); + // throw e; + // } + // + // if (!resp.accepted) { + // break; + // } + // } + // return new FragInstanceDispatchResult(resp.accepted); }); }
