This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c5dd6df27c Add Statistics grouped at Stage ID level in the V2 Engine
Response (#10337)
c5dd6df27c is described below
commit c5dd6df27c384b523c77adcb66956bf0e5c37251
Author: Kartik Khare <[email protected]>
AuthorDate: Tue Feb 28 15:49:46 2023 +0530
Add Statistics grouped at Stage ID level in the V2 Engine Response (#10337)
* WIP: aggregate stats on stage level for response
* Make response backward compatible
* Add new metadata keys to enum and replace hardcoded values; also add
table names to the stats
* Rename operatorExecutionTime to stageExecutionTime for correct
understanding
* Remove sysout
* Remove duplicate code inside BrokerResponseStats class
* Remove unused constants from OperatorUtils and fix formatting
* Add test for stage level stats as well as BrokerResponseNativeV2
* Add followup TODOs and move method to utils class
---------
Co-authored-by: Kartik Khare <[email protected]>
---
.../MultiStageBrokerRequestHandler.java | 36 ++++++-
.../apache/pinot/common/datatable/DataTable.java | 15 ++-
.../response/broker/BrokerResponseNativeV2.java | 93 +++++++++++++++++
.../response/broker/BrokerResponseStats.java | 110 +++++++++++++++++++++
.../query/reduce/ExecutionStatsAggregator.java | 76 ++++++++++++--
.../query/runtime/operator/MultiStageOperator.java | 2 +
.../query/runtime/operator/OperatorStats.java | 15 ++-
.../runtime/operator/utils/OperatorUtils.java | 14 ++-
.../pinot/query/service/QueryDispatcher.java | 20 ++--
.../pinot/query/runtime/QueryRunnerTestBase.java | 7 +-
.../runtime/queries/ResourceBasedQueriesTest.java | 56 ++++++++---
11 files changed, 395 insertions(+), 49 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 63c1e8f9ad..e2dae3b75f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,8 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.BrokerResponseStats;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -58,6 +61,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +137,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return handleRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext);
}
- private BrokerResponseNative handleRequest(long requestId, String query,
+ private BrokerResponse handleRequest(long requestId, String query,
@Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode request,
@Nullable RequesterIdentity requesterIdentity,
RequestContext requestContext)
throws Exception {
@@ -166,16 +170,20 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
ResultTable queryResults;
- ExecutionStatsAggregator executionStatsAggregator = new
ExecutionStatsAggregator(false);
+ Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
+ for (Integer stageId: queryPlan.getStageMetadataMap().keySet()) {
+ stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(false));
+ }
+
try {
queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan,
_mailboxService, queryTimeoutMs,
- sqlNodeAndOptions.getOptions(), executionStatsAggregator);
+ sqlNodeAndOptions.getOptions(), stageIdStatsMap);
} catch (Exception e) {
LOGGER.info("query execution failed", e);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
}
- BrokerResponseNative brokerResponse = new BrokerResponseNative();
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
long executionEndTimeNs = System.nanoTime();
// Set total query processing time
@@ -184,7 +192,25 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(queryResults);
- executionStatsAggregator.setStats(brokerResponse);
+ for (Map.Entry<Integer, ExecutionStatsAggregator> entry :
stageIdStatsMap.entrySet()) {
+ if (entry.getKey() == 0) {
+ // Root stats are aggregated and added separately to broker response
for backward compatibility
+ entry.getValue().setStats(brokerResponse);
+ continue;
+ }
+
+ BrokerResponseStats brokerResponseStats = new BrokerResponseStats();
+ List<String> tableNames =
queryPlan.getStageMetadataMap().get(entry.getKey()).getScannedTables();
+ if (tableNames.size() > 0) {
+ //TODO: Only using first table to assign broker metrics
+ // find a way to split metrics in case of multiple table
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNames.get(0));
+ entry.getValue().setStageLevelStats(rawTableName, brokerResponseStats,
_brokerMetrics);
+ } else {
+ entry.getValue().setStageLevelStats(null, brokerResponseStats, null);
+ }
+ brokerResponse.addStageStat(entry.getKey(), brokerResponseStats);
+ }
requestContext.setQueryProcessingTime(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index 9bac9706a7..ea996cba6d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -102,7 +102,7 @@ public interface DataTable {
*/
enum MetadataKey {
UNKNOWN(0, "unknown", MetadataValueType.STRING),
- TABLE(1, "table", MetadataValueType.STRING), // NOTE: this key is only
used in PrioritySchedulerTest
+ TABLE(1, "table", MetadataValueType.STRING),
NUM_DOCS_SCANNED(2, "numDocsScanned", MetadataValueType.LONG),
NUM_ENTRIES_SCANNED_IN_FILTER(3, "numEntriesScannedInFilter",
MetadataValueType.LONG),
NUM_ENTRIES_SCANNED_POST_FILTER(4, "numEntriesScannedPostFilter",
MetadataValueType.LONG),
@@ -110,8 +110,6 @@ public interface DataTable {
NUM_SEGMENTS_PROCESSED(6, "numSegmentsProcessed", MetadataValueType.INT),
NUM_SEGMENTS_MATCHED(7, "numSegmentsMatched", MetadataValueType.INT),
NUM_CONSUMING_SEGMENTS_QUERIED(8, "numConsumingSegmentsQueried",
MetadataValueType.INT),
- NUM_CONSUMING_SEGMENTS_PROCESSED(26, "numConsumingSegmentsProcessed",
MetadataValueType.INT),
- NUM_CONSUMING_SEGMENTS_MATCHED(27, "numConsumingSegmentsMatched",
MetadataValueType.INT),
MIN_CONSUMING_FRESHNESS_TIME_MS(9, "minConsumingFreshnessTimeMs",
MetadataValueType.LONG),
TOTAL_DOCS(10, "totalDocs", MetadataValueType.LONG),
NUM_GROUPS_LIMIT_REACHED(11, "numGroupsLimitReached",
MetadataValueType.STRING),
@@ -128,11 +126,17 @@ public interface DataTable {
NUM_SEGMENTS_PRUNED_BY_LIMIT(22, "numSegmentsPrunedByLimit",
MetadataValueType.INT),
NUM_SEGMENTS_PRUNED_BY_VALUE(23, "numSegmentsPrunedByValue",
MetadataValueType.INT),
EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS(24,
"explainPlanNumEmptyFilterSegments", MetadataValueType.INT),
- EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(25,
"explainPlanNumMatchAllFilterSegments", MetadataValueType.INT);
+ EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS(25,
"explainPlanNumMatchAllFilterSegments", MetadataValueType.INT),
+ NUM_CONSUMING_SEGMENTS_PROCESSED(26, "numConsumingSegmentsProcessed",
MetadataValueType.INT),
+ NUM_CONSUMING_SEGMENTS_MATCHED(27, "numConsumingSegmentsMatched",
MetadataValueType.INT),
+ NUM_BLOCKS(28, "numBlocks", MetadataValueType.INT),
+ NUM_ROWS(29, "numRows", MetadataValueType.INT),
+ OPERATOR_EXECUTION_TIME_MS(30, "operatorExecutionTimeMs",
MetadataValueType.LONG),
+ OPERATOR_ID(31, "operatorId", MetadataValueType.STRING);
// We keep this constant to track the max id added so far for backward
compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
- private static final int MAX_ID = 27;
+ private static final int MAX_ID = 31;
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new
MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new
HashMap<>();
@@ -186,6 +190,7 @@ public interface DataTable {
int id = key.getId();
Preconditions.checkArgument(id >= 0 && id <= MAX_ID,
"Invalid id: %s for MetadataKey: %s, must be in the range of [0,
MAX_ID(%s)]", id, key, MAX_ID);
+
Preconditions.checkArgument(ID_TO_ENUM_KEY_MAP[id] == null,
"Duplicate id: %s defined for MetadataKey: %s and %s", id,
ID_TO_ENUM_KEY_MAP[id], key);
ID_TO_ENUM_KEY_MAP[id] = key;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
new file mode 100644
index 0000000000..79605773d7
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * This class implements pinot-broker's response format for any given query.
+ * All fields either primitive data types, or native objects (as opposed to
JSONObjects).
+ *
+ * Supports serialization via JSON.
+ */
+@JsonPropertyOrder({
+ "resultTable", "stageStats", "exceptions", "numServersQueried",
"numServersResponded", "numSegmentsQueried",
+ "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
+ "numConsumingSegmentsMatched", "numDocsScanned",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+ "numGroupsLimitReached", "totalDocs", "timeUsedMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
+ "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs", "segmentStatistics",
+ "traceInfo"
+})
+public class BrokerResponseNativeV2 extends BrokerResponseNative {
+ private final Map<Integer, BrokerResponseStats> _stageIdStats = new
HashMap<>();
+
+ public BrokerResponseNativeV2() {
+ }
+
+ public BrokerResponseNativeV2(ProcessingException exception) {
+ super(exception);
+ }
+
+ public BrokerResponseNativeV2(List<ProcessingException> exceptions) {
+ super(exceptions);
+ }
+
+ /** Generate EXPLAIN PLAN output when queries are evaluated by Broker
without going to the Server. */
+ private static BrokerResponseNativeV2 getBrokerResponseExplainPlanOutput() {
+ BrokerResponseNativeV2 brokerResponse = BrokerResponseNativeV2.empty();
+ List<Object[]> rows = new ArrayList<>();
+ rows.add(new Object[]{"BROKER_EVALUATE", 0, -1});
+ brokerResponse.setResultTable(new
ResultTable(DataSchema.EXPLAIN_RESULT_SCHEMA, rows));
+ return brokerResponse;
+ }
+
+ /**
+ * Get a new empty {@link BrokerResponseNativeV2}.
+ */
+ public static BrokerResponseNativeV2 empty() {
+ return new BrokerResponseNativeV2();
+ }
+
+ public static BrokerResponseNativeV2 fromJsonString(String jsonString)
+ throws IOException {
+ return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class);
+ }
+
+ public void addStageStat(Integer stageId, BrokerResponseStats
brokerResponseStats) {
+ if (!brokerResponseStats.getOperatorIds().isEmpty()) {
+ _stageIdStats.put(stageId, brokerResponseStats);
+ }
+ }
+
+ @JsonProperty("stageStats")
+ public Map<Integer, BrokerResponseStats> getStageIdStats() {
+ return _stageIdStats;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
new file mode 100644
index 0000000000..60d7ab4813
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+// TODO: Decouple the execution stats aggregator logic and make it into a util
that can aggregate 2 values with the
+// same metadataKey
+// TODO: Replace member fields with a simple map of <MetadataKey, Object>
+// TODO: Add a subStat field, stage level subStats will contain each operator
stats
+@JsonPropertyOrder({"exceptions", "numBlocks", "numRows",
"stageExecutionTimeMs", "numServersQueried",
+ "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed",
"numSegmentsMatched",
+ "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched",
+ "numDocsScanned", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numGroupsLimitReached",
+ "totalDocs", "timeUsedMs", "offlineThreadCpuTimeNs",
"realtimeThreadCpuTimeNs",
+ "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
+ "traceInfo", "operatorIds", "tableNames"})
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class BrokerResponseStats extends BrokerResponseNative {
+
+ private int _numBlocks = 0;
+ private int _numRows = 0;
+ private long _stageExecutionTimeMs = 0;
+ private List<String> _operatorIds = new ArrayList<>();
+ private List<String> _tableNames = new ArrayList<>();
+
+ @Override
+ public ResultTable getResultTable() {
+ return null;
+ }
+
+ @JsonProperty("numBlocks")
+ public int getNumBlocks() {
+ return _numBlocks;
+ }
+
+ @JsonProperty("numBlocks")
+ public void setNumBlocks(int numBlocks) {
+ _numBlocks = numBlocks;
+ }
+
+ @JsonProperty("numRows")
+ public int getNumRows() {
+ return _numRows;
+ }
+
+ @JsonProperty("numRows")
+ public void setNumRows(int numRows) {
+ _numRows = numRows;
+ }
+
+ @JsonProperty("stageExecutionTimeMs")
+ public long getStageExecutionTimeMs() {
+ return _stageExecutionTimeMs;
+ }
+
+ @JsonProperty("stageExecutionTimeMs")
+ public void setStageExecutionTimeMs(long stageExecutionTimeMs) {
+ _stageExecutionTimeMs = stageExecutionTimeMs;
+ }
+
+ public String toJsonString()
+ throws IOException {
+ return JsonUtils.objectToString(this);
+ }
+
+ @JsonProperty("operatorIds")
+ public List<String> getOperatorIds() {
+ return _operatorIds;
+ }
+
+ @JsonProperty("operatorIds")
+ public void setOperatorIds(List<String> operatorIds) {
+ _operatorIds = operatorIds;
+ }
+
+ @JsonProperty("tableNames")
+ public List<String> getTableNames() {
+ return _tableNames;
+ }
+
+ @JsonProperty("tableNames")
+ public void setTableNames(List<String> tableNames) {
+ _tableNames = tableNames;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 0c93252c51..889b859c29 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -19,24 +19,32 @@
package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.BrokerResponseStats;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class ExecutionStatsAggregator {
private final List<QueryProcessingException> _processingExceptions = new
ArrayList<>();
+ private final List<String> _operatorIds = new ArrayList<>();
+ private final Set<String> _tableNames = new HashSet<>();
private final Map<String, String> _traceInfo = new HashMap<>();
private final boolean _enableTrace;
@@ -66,6 +74,9 @@ public class ExecutionStatsAggregator {
private long _explainPlanNumEmptyFilterSegments = 0L;
private long _explainPlanNumMatchAllFilterSegments = 0L;
private boolean _numGroupsLimitReached = false;
+ private int _numBlocks = 0;
+ private int _numRows = 0;
+ private long _stageExecutionTimeMs = 0;
public ExecutionStatsAggregator(boolean enableTrace) {
_enableTrace = enableTrace;
@@ -82,6 +93,32 @@ public class ExecutionStatsAggregator {
_traceInfo.put(routingInstance.getShortName(),
metadata.get(DataTable.MetadataKey.TRACE_INFO.getName()));
}
+ String tableNamesStr = metadata.get(DataTable.MetadataKey.TABLE.getName());
+ String tableName = null;
+
+ if (tableNamesStr != null) {
+ List<String> tableNames =
Arrays.stream(tableNamesStr.split("::")).collect(Collectors.toList());
+ _tableNames.addAll(tableNames);
+
+ //TODO: Decide a strategy to split stageLevel stats across tables for
brokerMetrics
+ // assigning everything to the first table only for now
+ tableName = tableNames.get(0);
+ }
+
+ TableType tableType = null;
+ if (routingInstance != null) {
+ tableType = routingInstance.getTableType();
+ } else if (tableName != null) {
+ tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
+ } else {
+ tableType = null;
+ }
+
+ String operatorId =
metadata.get(DataTable.MetadataKey.OPERATOR_ID.getName());
+ if (operatorId != null) {
+ _operatorIds.add(operatorId);
+ }
+
// Reduce on exceptions.
for (int key : exceptions.keySet()) {
_processingExceptions.add(new QueryProcessingException(key,
exceptions.get(key)));
@@ -141,8 +178,8 @@ public class ExecutionStatsAggregator {
}
String threadCpuTimeNsString =
metadata.get(DataTable.MetadataKey.THREAD_CPU_TIME_NS.getName());
- if (routingInstance != null && threadCpuTimeNsString != null) {
- if (routingInstance.getTableType() == TableType.OFFLINE) {
+ if (tableType != null && threadCpuTimeNsString != null) {
+ if (tableType == TableType.OFFLINE) {
_offlineThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
} else {
_realtimeThreadCpuTimeNs += Long.parseLong(threadCpuTimeNsString);
@@ -151,8 +188,8 @@ public class ExecutionStatsAggregator {
String systemActivitiesCpuTimeNsString =
metadata.get(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
- if (routingInstance != null && systemActivitiesCpuTimeNsString != null) {
- if (routingInstance.getTableType() == TableType.OFFLINE) {
+ if (tableType != null && systemActivitiesCpuTimeNsString != null) {
+ if (tableType == TableType.OFFLINE) {
_offlineSystemActivitiesCpuTimeNs +=
Long.parseLong(systemActivitiesCpuTimeNsString);
} else {
_realtimeSystemActivitiesCpuTimeNs +=
Long.parseLong(systemActivitiesCpuTimeNsString);
@@ -161,8 +198,8 @@ public class ExecutionStatsAggregator {
String responseSerializationCpuTimeNsString =
metadata.get(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
- if (routingInstance != null && responseSerializationCpuTimeNsString !=
null) {
- if (routingInstance.getTableType() == TableType.OFFLINE) {
+ if (tableType != null && responseSerializationCpuTimeNsString != null) {
+ if (tableType == TableType.OFFLINE) {
_offlineResponseSerializationCpuTimeNs +=
Long.parseLong(responseSerializationCpuTimeNsString);
} else {
_realtimeResponseSerializationCpuTimeNs +=
Long.parseLong(responseSerializationCpuTimeNsString);
@@ -200,6 +237,22 @@ public class ExecutionStatsAggregator {
}
_numGroupsLimitReached |=
Boolean.parseBoolean(metadata.get(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()));
+
+
+ String numBlocksString =
metadata.get(DataTable.MetadataKey.NUM_BLOCKS.getName());
+ if (numBlocksString != null) {
+ _numBlocks += Long.parseLong(numBlocksString);
+ }
+
+ String numRowsString =
metadata.get(DataTable.MetadataKey.NUM_ROWS.getName());
+ if (numBlocksString != null) {
+ _numRows += Long.parseLong(numRowsString);
+ }
+
+ String operatorExecutionTimeString =
metadata.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName());
+ if (operatorExecutionTimeString != null) {
+ _stageExecutionTimeMs += Long.parseLong(operatorExecutionTimeString);
+ }
}
public void setStats(BrokerResponseNative brokerResponseNative) {
@@ -280,6 +333,17 @@ public class ExecutionStatsAggregator {
}
}
+ public void setStageLevelStats(@Nullable String rawTableName,
BrokerResponseStats brokerResponseStats,
+ @Nullable BrokerMetrics brokerMetrics) {
+ setStats(rawTableName, brokerResponseStats, brokerMetrics);
+
+ brokerResponseStats.setNumBlocks(_numBlocks);
+ brokerResponseStats.setNumRows(_numRows);
+ brokerResponseStats.setStageExecutionTimeMs(_stageExecutionTimeMs);
+ brokerResponseStats.setOperatorIds(_operatorIds);
+ brokerResponseStats.setTableNames(new ArrayList<>(_tableNames));
+ }
+
private void withNotNullLongMetadata(Map<String, String> metadata,
DataTable.MetadataKey key, LongConsumer consumer) {
String strValue = metadata.get(key.getName());
if (strValue != null) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index d9c0104d91..a6bc863457 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -74,6 +75,7 @@ public abstract class MultiStageOperator implements
Operator<TransferableBlock>,
}
if (!_operatorStats.getExecutionStats().isEmpty()) {
+
_operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(),
_operatorId);
_operatorStatsMap.put(_operatorId, _operatorStats);
}
return TransferableBlockUtils.getEndOfStreamTransferableBlock(
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 09a8d7ac60..74cefbbc90 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -22,6 +22,7 @@ import com.google.common.base.Stopwatch;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
@@ -38,7 +39,7 @@ public class OperatorStats {
private int _numBlock = 0;
private int _numRows = 0;
- private Map<String, String> _executionStats;
+ private final Map<String, String> _executionStats;
public OperatorStats(long requestId, int stageId, VirtualServerAddress
serverAddress, String operatorType) {
_stageId = stageId;
@@ -65,14 +66,18 @@ public class OperatorStats {
_numRows += numRows;
}
+ public void recordSingleStat(String key, String stat) {
+ _executionStats.put(key, stat);
+ }
+
public void recordExecutionStats(Map<String, String> executionStats) {
- _executionStats = executionStats;
+ _executionStats.putAll(executionStats);
}
public Map<String, String> getExecutionStats() {
- _executionStats.put(OperatorUtils.NUM_BLOCKS, String.valueOf(_numBlock));
- _executionStats.put(OperatorUtils.NUM_ROWS, String.valueOf(_numRows));
- _executionStats.put(OperatorUtils.THREAD_EXECUTION_TIME,
+ _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_BLOCKS.getName(),
String.valueOf(_numBlock));
+ _executionStats.putIfAbsent(DataTable.MetadataKey.NUM_ROWS.getName(),
String.valueOf(_numRows));
+
_executionStats.putIfAbsent(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName(),
String.valueOf(_executeStopwatch.elapsed(TimeUnit.MILLISECONDS)));
return _executionStats;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index 532befd702..ab79339891 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -20,10 +20,13 @@ package org.apache.pinot.query.runtime.operator.utils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -32,10 +35,6 @@ import org.slf4j.LoggerFactory;
public class OperatorUtils {
- public static final String NUM_BLOCKS = "numBlocks";
- public static final String NUM_ROWS = "numRows";
- public static final String THREAD_EXECUTION_TIME = "threadExecutionTime";
-
private static final Logger LOGGER =
LoggerFactory.getLogger(OperatorUtils.class);
private static final Map<String, String> OPERATOR_TOKEN_MAPPING = new
HashMap<>();
@@ -69,6 +68,13 @@ public class OperatorUtils {
return functionName;
}
+ public static void recordTableName(OperatorStats operatorStats,
StageMetadata operatorStageMetadata) {
+ if (!operatorStageMetadata.getScannedTables().isEmpty()) {
+ operatorStats.recordSingleStat(DataTable.MetadataKey.TABLE.getName(),
+ Joiner.on("::").join(operatorStageMetadata.getScannedTables()));
+ }
+ }
+
public static String operatorStatsToJson(OperatorStats operatorStats) {
try {
Map<String, Object> jsonOut = new HashMap<>();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 9c7b73a9e0..18511d9ee7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -69,7 +69,7 @@ public class QueryDispatcher {
public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
MailboxService<TransferableBlock> mailboxService, long timeoutMs,
Map<String, String> queryOptions,
- ExecutionStatsAggregator executionStatsAggregator)
+ Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
throws Exception {
// submit all the distributed stages.
int reduceStageId = submit(requestId, queryPlan, timeoutMs, queryOptions);
@@ -80,7 +80,7 @@ public class QueryDispatcher {
reduceNode.getSenderStageId(), reduceStageId,
reduceNode.getDataSchema(),
new VirtualServerAddress(mailboxService.getHostname(),
mailboxService.getMailboxPort(), 0), timeoutMs);
List<DataBlock> resultDataBlocks =
- reduceMailboxReceive(mailboxReceiveOperator, timeoutMs,
executionStatsAggregator);
+ reduceMailboxReceive(mailboxReceiveOperator, timeoutMs,
executionStatsAggregator, queryPlan);
return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema());
}
@@ -128,11 +128,11 @@ public class QueryDispatcher {
}
public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator
mailboxReceiveOperator, long timeoutMs) {
- return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null);
+ return reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, null, null);
}
public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator
mailboxReceiveOperator, long timeoutMs,
- @Nullable ExecutionStatsAggregator executionStatsAggregator) {
+ @Nullable Map<Integer, ExecutionStatsAggregator>
executionStatsAggregatorMap, QueryPlan queryPlan) {
List<DataBlock> resultDataBlocks = new ArrayList<>();
TransferableBlock transferableBlock;
long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
@@ -147,11 +147,19 @@ public class QueryDispatcher {
if (transferableBlock.isNoOpBlock()) {
continue;
} else if (transferableBlock.isEndOfStreamBlock()) {
- if (executionStatsAggregator != null) {
+ if (executionStatsAggregatorMap != null) {
for (Map.Entry<String, OperatorStats> entry :
transferableBlock.getResultMetadata().entrySet()) {
LOGGER.info("Broker Query Execution Stats - OperatorId: {},
OperatorStats: {}", entry.getKey(),
OperatorUtils.operatorStatsToJson(entry.getValue()));
- executionStatsAggregator.aggregate(null,
entry.getValue().getExecutionStats(), new HashMap<>());
+ OperatorStats operatorStats = entry.getValue();
+ ExecutionStatsAggregator rootStatsAggregator =
executionStatsAggregatorMap.get(0);
+ ExecutionStatsAggregator stageStatsAggregator =
executionStatsAggregatorMap.get(operatorStats.getStageId());
+ if (queryPlan != null) {
+ StageMetadata operatorStageMetadata =
queryPlan.getStageMetadataMap().get(operatorStats.getStageId());
+ OperatorUtils.recordTableName(operatorStats,
operatorStageMetadata);
+ }
+ rootStatsAggregator.aggregate(null,
entry.getValue().getExecutionStats(), new HashMap<>());
+ stageStatsAggregator.aggregate(null,
entry.getValue().getExecutionStats(), new HashMap<>());
}
}
return resultDataBlocks;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 6ebc8cdb1c..53be2be4a5 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -82,7 +82,7 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
// --------------------------------------------------------------------------
// QUERY UTILS
// --------------------------------------------------------------------------
- protected List<Object[]> queryRunner(String sql, ExecutionStatsAggregator
executionStatsAggregator) {
+ protected List<Object[]> queryRunner(String sql, Map<Integer,
ExecutionStatsAggregator> executionStatsAggregatorMap) {
QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
Map<String, String> requestMetadataMap =
ImmutableMap.of(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()),
@@ -105,11 +105,14 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
_servers.get(serverInstance.getServer()).processQuery(distributedStagePlan,
requestMetadataMap);
}
}
+ if (executionStatsAggregatorMap != null) {
+ executionStatsAggregatorMap.put(stageId, new
ExecutionStatsAggregator(false));
+ }
}
Preconditions.checkNotNull(mailboxReceiveOperator);
return QueryDispatcher.toResultTable(
QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator,
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS,
- executionStatsAggregator),
+ executionStatsAggregatorMap, null),
queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index d311e9f467..5b57f59ab0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -38,7 +38,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
+import org.apache.pinot.common.response.broker.BrokerResponseStats;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.query.QueryEnvironmentTestBase;
@@ -228,19 +229,40 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
@Test(dataProvider = "testResourceQueryTestCaseProviderWithMetadata")
public void testQueryTestCasesWithMetadata(String testCaseName, String sql,
String expect, int numSegments)
throws Exception {
- ExecutionStatsAggregator executionStatsAggregator = new
ExecutionStatsAggregator(false);
- runQuery(sql, expect, executionStatsAggregator).ifPresent(rows -> {
- BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
- executionStatsAggregator.setStats(brokerResponseNative);
+ Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap = new
HashMap<>();
+ runQuery(sql, expect, executionStatsAggregatorMap).ifPresent(rows -> {
+ BrokerResponseNativeV2 brokerResponseNative = new
BrokerResponseNativeV2();
+ executionStatsAggregatorMap.get(0).setStats(brokerResponseNative);
+ Assert.assertFalse(executionStatsAggregatorMap.isEmpty());
+ for (Integer stageId : executionStatsAggregatorMap.keySet()) {
+ if (stageId > 0) {
+ BrokerResponseStats brokerResponseStats = new BrokerResponseStats();
+ executionStatsAggregatorMap.get(stageId).setStageLevelStats(null,
brokerResponseStats, null);
+ brokerResponseNative.addStageStat(stageId, brokerResponseStats);
+ }
+ }
+
Assert.assertEquals(brokerResponseNative.getNumSegmentsQueried(),
numSegments);
+
+ Map<Integer, BrokerResponseStats> stageIdStats =
brokerResponseNative.getStageIdStats();
+ for (Integer stageId : stageIdStats.keySet()) {
+ // check stats only for leaf stage
+ BrokerResponseStats brokerResponseStats = stageIdStats.get(stageId);
+ if (!brokerResponseStats.getTableNames().isEmpty()) {
+ Assert.assertEquals(brokerResponseStats.getTableNames().size(), 1);
+ String tableName = brokerResponseStats.getTableNames().get(0);
+ Assert.assertNotNull(_tableToSegmentMap.get(tableName));
+ Assert.assertEquals(brokerResponseStats.getNumSegmentsQueried(),
_tableToSegmentMap.get(tableName).size());
+ }
+ }
});
}
private Optional<List<Object[]>> runQuery(String sql, final String except,
- ExecutionStatsAggregator executionStatsAggregator) {
+ Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
try {
// query pinot
- List<Object[]> resultRows = queryRunner(sql, executionStatsAggregator);
+ List<Object[]> resultRows = queryRunner(sql,
executionStatsAggregatorMap);
Assert.assertNull(except,
"Expected error with message '" + except + "'. But instead rows were
returned: " + resultRows.stream()
@@ -299,29 +321,30 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
throws Exception {
Map<String, QueryTestCase> testCaseMap = getTestCases();
List<Object[]> providerContent = new ArrayList<>();
+ Set<String> validTestCases = new HashSet<>();
+ validTestCases.add("basic_test");
+ validTestCases.add("framework_test");
+
for (Map.Entry<String, QueryTestCase> testCaseEntry :
testCaseMap.entrySet()) {
String testCaseName = testCaseEntry.getKey();
+ if (!validTestCases.contains(testCaseName)) {
+ continue;
+ }
+
if (testCaseEntry.getValue()._ignored) {
continue;
}
List<QueryTestCase.Query> queryCases = testCaseEntry.getValue()._queries;
for (QueryTestCase.Query queryCase : queryCases) {
+
if (queryCase._ignored) {
continue;
}
- if (queryCase._outputs != null) {
+ if (queryCase._outputs == null) {
String sql = replaceTableName(testCaseName, queryCase._sql);
- if (!sql.contains("basic_test")) {
- continue;
- }
- List<List<Object>> orgRows = queryCase._outputs;
- List<Object[]> expectedRows = new ArrayList<>(orgRows.size());
- for (List<Object> objs : orgRows) {
- expectedRows.add(objs.toArray());
- }
int segmentCount = 0;
for (String tableName : testCaseEntry.getValue()._tables.keySet()) {
segmentCount +=
@@ -401,6 +424,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
{
HashSet<String> hashSet = new HashSet<>(testCaseMap.keySet());
hashSet.retainAll(testCases.keySet());
+
if (!hashSet.isEmpty()) {
throw new IllegalArgumentException("testCase already exist for the
following names: " + hashSet);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]