This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3ffa7c3e32 [multistage][stats] clean up some stats (#10390)
3ffa7c3e32 is described below
commit 3ffa7c3e3237277c6a60fb3379d0a5c44e8240e0
Author: Rong Rong <[email protected]>
AuthorDate: Wed Mar 8 14:49:34 2023 -0800
[multistage][stats] clean up some stats (#10390)
* adding more stats for wall time compute
* also enable trace support
---------
Co-authored-by: Rong Rong <[email protected]>
---
.../apache/pinot/common/datatable/DataTable.java | 6 ++--
.../response/broker/BrokerResponseNativeV2.java | 3 +-
.../response/broker/BrokerResponseStats.java | 32 ++++++++++++++---
.../query/reduce/ExecutionStatsAggregator.java | 40 +++++++++++++++++++---
.../apache/pinot/core/util/trace/TraceContext.java | 7 ++--
.../apache/pinot/query/runtime/QueryRunner.java | 3 +-
.../query/runtime/operator/MultiStageOperator.java | 1 -
.../query/runtime/operator/OperatorStats.java | 7 ++++
.../runtime/plan/ServerRequestPlanVisitor.java | 6 ++--
9 files changed, 85 insertions(+), 20 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index ea996cba6d..928834359f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -132,11 +132,13 @@ public interface DataTable {
NUM_BLOCKS(28, "numBlocks", MetadataValueType.INT),
NUM_ROWS(29, "numRows", MetadataValueType.INT),
OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs",
MetadataValueType.LONG),
- OPERATOR_ID(31, "operatorId", MetadataValueType.STRING);
+ OPERATOR_ID(31, "operatorId", MetadataValueType.STRING),
+ OPERATOR_EXEC_START_TIME_MS(32, "operatorExecStartTimeMs",
MetadataValueType.LONG),
+ OPERATOR_EXEC_END_TIME_MS(33, "operatorExecEndTimeMs",
MetadataValueType.LONG);
// We keep this constant to track the max id added so far for backward
compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
- private static final int MAX_ID = 31;
+ private static final int MAX_ID = 33;
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new
MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new
HashMap<>();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 5bf631e129..2eacbc23aa 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -81,7 +81,8 @@ public class BrokerResponseNativeV2 extends
BrokerResponseNative {
}
public void addStageStat(Integer stageId, BrokerResponseStats
brokerResponseStats) {
- if (!brokerResponseStats.getOperatorStats().isEmpty()) {
+ // StageExecutionWallTime will always be there, other stats are optional
such as OperatorStats
+ if (brokerResponseStats.getStageExecWallTimeMs() != -1) {
_stageIdStats.put(stageId, brokerResponseStats);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
index 83cbd16f3c..23482e905c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -33,11 +33,11 @@ import org.apache.pinot.spi.utils.JsonUtils;
// same metadataKey
// TODO: Replace member fields with a simple map of <MetadataKey, Object>
// TODO: Add a subStat field, stage level subStats will contain each operator
stats
-@JsonPropertyOrder({"exceptions", "numBlocks", "numRows",
"stageExecutionTimeMs", "numServersQueried",
- "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed",
"numSegmentsMatched",
- "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched",
- "numDocsScanned", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numGroupsLimitReached",
- "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs",
+@JsonPropertyOrder({"exceptions", "numBlocks", "numRows",
"stageExecutionTimeMs", "stageExecutionUnit",
+ "stageExecWallTimeMs", "stageExecEndTimeMs", "numServersQueried",
"numServersResponded", "numSegmentsQueried",
+ "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+ "numConsumingSegmentsMatched", "numDocsScanned",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+ "numGroupsLimitReached", "totalDocs", "timeUsedMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
"traceInfo", "operatorStats", "tableNames"})
@@ -47,6 +47,8 @@ public class BrokerResponseStats extends BrokerResponseNative
{
private int _numBlocks = 0;
private int _numRows = 0;
private long _stageExecutionTimeMs = 0;
+ private int _stageExecutionUnit = 0;
+ private long _stageExecWallTimeMs = -1;
private Map<String, Map<String, String>> _operatorStats = new HashMap<>();
private List<String> _tableNames = new ArrayList<>();
@@ -85,6 +87,26 @@ public class BrokerResponseStats extends
BrokerResponseNative {
_stageExecutionTimeMs = stageExecutionTimeMs;
}
+ @JsonProperty("stageExecWallTimeMs")
+ public long getStageExecWallTimeMs() {
+ return _stageExecWallTimeMs;
+ }
+
+ @JsonProperty("stageExecWallTimeMs")
+ public void setStageExecWallTimeMs(long stageExecWallTimeMs) {
+ _stageExecWallTimeMs = stageExecWallTimeMs;
+ }
+
+ @JsonProperty("stageExecutionUnit")
+ public long getStageExecutionUnit() {
+ return _stageExecutionUnit;
+ }
+
+ @JsonProperty("stageExecutionUnit")
+ public void setStageExecutionUnit(int stageExecutionUnit) {
+ _stageExecutionUnit = stageExecutionUnit;
+ }
+
public String toJsonString()
throws IOException {
return JsonUtils.objectToString(this);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 2faefee584..c8ef48af4a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -77,6 +78,9 @@ public class ExecutionStatsAggregator {
private int _numBlocks = 0;
private int _numRows = 0;
private long _stageExecutionTimeMs = 0;
+ private long _stageExecStartTimeMs = -1;
+ private long _stageExecEndTimeMs = -1;
+ private int _stageExecutionUnit = 0;
public ExecutionStatsAggregator(boolean enableTrace) {
_enableTrace = enableTrace;
@@ -88,10 +92,6 @@ public class ExecutionStatsAggregator {
public synchronized void aggregate(@Nullable ServerRoutingInstance
routingInstance, Map<String, String> metadata,
Map<Integer, String> exceptions) {
- // Reduce on trace info.
- if (_enableTrace &&
metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName())) {
- _traceInfo.put(routingInstance.getShortName(),
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
- }
String tableNamesStr = metadata.get(DataTable.MetadataKey.TABLE.getName());
String tableName = null;
@@ -106,12 +106,21 @@ public class ExecutionStatsAggregator {
}
TableType tableType = null;
+ String instanceName = null;
if (routingInstance != null) {
tableType = routingInstance.getTableType();
+ instanceName = routingInstance.getShortName();;
} else if (tableName != null) {
tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ instanceName = tableName;
} else {
tableType = null;
+ instanceName = null;
+ }
+
+ // Reduce on trace info.
+ if (_enableTrace &&
metadata.containsKey(DataTable.MetadataKey.TRACE_INFO.getName()) &&
instanceName != null) {
+ _traceInfo.put(instanceName,
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
}
String operatorId =
metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName());
@@ -256,6 +265,21 @@ public class ExecutionStatsAggregator {
String operatorExecutionTimeString =
metadata.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName());
if (operatorExecutionTimeString != null) {
_stageExecutionTimeMs += Long.parseLong(operatorExecutionTimeString);
+ _stageExecutionUnit += 1;
+ }
+
+ String operatorExecStartTimeString =
metadata.get(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName());
+ if (operatorExecStartTimeString != null) {
+ long operatorExecStartTime = Long.parseLong(operatorExecStartTimeString);
+ _stageExecStartTimeMs = _stageExecStartTimeMs == -1 ?
operatorExecStartTime
+ : Math.min(operatorExecStartTime, _stageExecStartTimeMs);
+ }
+
+ String operatorExecEndTimeString =
metadata.get(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName());
+ if (operatorExecEndTimeString != null) {
+ long operatorExecEndTime = Long.parseLong(operatorExecEndTimeString);
+ _stageExecEndTimeMs = _stageExecEndTimeMs == -1 ? operatorExecEndTime
+ : Math.max(operatorExecEndTime, _stageExecEndTimeMs);
}
}
@@ -344,7 +368,13 @@ public class ExecutionStatsAggregator {
brokerResponseStats.setNumBlocks(_numBlocks);
brokerResponseStats.setNumRows(_numRows);
brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
- brokerResponseStats.setOperatorStats(_operatorStats);
+ brokerResponseStats.setStageExecWallTimeMs(_stageExecEndTimeMs -
_stageExecStartTimeMs);
+ brokerResponseStats.setStageExecutionUnit(_stageExecutionUnit);
+ if (_enableTrace) {
+ brokerResponseStats.setOperatorStats(_operatorStats);
+ } else {
+ brokerResponseStats.setOperatorStats(Collections.emptyMap());
+ }
brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
index 4d430434e5..0bdaf0da62 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/TraceContext.java
@@ -189,8 +189,11 @@ public final class TraceContext {
*/
public static String getTraceInfo() {
ArrayNode jsonTraces = JsonUtils.newArrayNode();
- for (Trace trace :
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId)) {
- jsonTraces.add(trace.toJson());
+ Queue<Trace> traces =
REQUEST_TO_TRACES_MAP.get(TRACE_ENTRY_THREAD_LOCAL.get()._requestId);
+ if (traces != null && !traces.isEmpty()) {
+ for (Trace trace : traces) {
+ jsonTraces.add(trace.toJson());
+ }
}
return jsonTraces.toString();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 4c7380943e..a1d22c678f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -184,8 +184,7 @@ public class QueryRunner {
MailboxSendOperator mailboxSendOperator = new
MailboxSendOperator(_mailboxService,
new LeafStageTransferableBlockOperator(serverQueryResults,
sendNode.getDataSchema(), requestId,
sendNode.getStageId(), _rootServer),
receivingStageMetadata.getServerInstances(),
- sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_rootServer,
- serverQueryRequests.get(0).getRequestId(),
+ sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_rootServer, requestId,
sendNode.getStageId(), sendNode.getReceiverStageId());
int blockCounter = 0;
while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index a6bc863457..55fbc8830d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -73,7 +73,6 @@ public abstract class MultiStageOperator implements
Operator<TransferableBlock>,
for (MultiStageOperator op : getChildOperators()) {
_operatorStatsMap.putAll(op.getOperatorStatsMap());
}
-
if (!_operatorStats.getExecutionStats().isEmpty()) {
_operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(),
_operatorId);
_operatorStatsMap.put(_operatorId, _operatorStats);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 74cefbbc90..715be163bb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -39,6 +39,7 @@ public class OperatorStats {
private int _numBlock = 0;
private int _numRows = 0;
+ private long _startTimeMs = -1;
private final Map<String, String> _executionStats;
public OperatorStats(long requestId, int stageId, VirtualServerAddress
serverAddress, String operatorType) {
@@ -50,6 +51,7 @@ public class OperatorStats {
}
public void startTimer() {
+ _startTimeMs = _startTimeMs == -1 ? System.currentTimeMillis() :
_startTimeMs;
if (!_executeStopwatch.isRunning()) {
_executeStopwatch.start();
}
@@ -79,6 +81,11 @@ public class OperatorStats {
_executionStats.putIfAbsent(DataTable.MetadataKey.NUM_ROWS.getName(),
String.valueOf(_numRows));
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(),
String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
+ // wall time are recorded slightly longer than actual execution but it is
OK.
+
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_START_TIME_MS.getName(),
+ String.valueOf(_startTimeMs));
+
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXEC_END_TIME_MS.getName(),
+ String.valueOf(System.currentTimeMillis()));
return _executionStats;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 97924586b1..55141cc739 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -93,7 +93,9 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap,
TableConfig tableConfig, Schema schema,
TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String>
segmentList) {
// Before-visit: construct the ServerPlanRequestContext baseline
- long requestId =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+ // Making a unique requestId for leaf stages otherwise it causes problem
on stats/metrics/tracing.
+ long requestId =
(Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID))
<< 16)
+ + (stagePlan.getStageId() << 8) + (tableType == TableType.REALTIME ? 1
: 0);
long timeoutMs =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
PinotQuery pinotQuery = new PinotQuery();
Integer leafNodeLimit =
QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap);
@@ -140,7 +142,7 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
InstanceRequest instanceRequest = new InstanceRequest();
instanceRequest.setRequestId(requestId);
instanceRequest.setBrokerId("unknown");
- instanceRequest.setEnableTrace(false);
+
instanceRequest.setEnableTrace(Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE)));
instanceRequest.setSearchSegments(segmentList);
instanceRequest.setQuery(brokerRequest);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]