gortiz commented on code in PR #12704: URL: https://github.com/apache/pinot/pull/12704#discussion_r1580608897
########## pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java: ########## @@ -77,20 +100,301 @@ public static BrokerResponseNativeV2 empty() { return new BrokerResponseNativeV2(); } - public static BrokerResponseNativeV2 fromJsonString(String jsonString) - throws IOException { - return JsonUtils.stringToObject(jsonString, BrokerResponseNativeV2.class); + public void addStageStats(JsonNode stageStats) { + ObjectNode node = JsonUtils.newObjectNode(); + node.put("stage", _stageIdStats.size()); + node.set("stats", stageStats); + _stageIdStats.add(node); + } + + @JsonProperty + public List<JsonNode> getStageStats() { + return _stageIdStats; + } + + @JsonProperty + public long getMaxRows() { + return _maxRows; + } + + public void mergeMaxRows(long maxRows) { + _maxRows = Math.max(_maxRows, maxRows); + } + + @Override + public void setTimeUsedMs(long timeUsedMs) { + _serverStats.merge(DataTable.MetadataKey.TIME_USED_MS, timeUsedMs); + } + + @Override + public long getNumDocsScanned() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_DOCS_SCANNED); + } + + @Override + public long getNumEntriesScannedInFilter() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER); + } + + @Override + public long getNumEntriesScannedPostFilter() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER); + } + + @Override + public long getNumSegmentsQueried() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_QUERIED); + } + + @Override + public long getNumSegmentsProcessed() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED); + } + + @Override + public long getNumSegmentsMatched() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_MATCHED); + } + + @Override + public long getNumConsumingSegmentsQueried() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED); + } + + @Override + public long getNumConsumingSegmentsProcessed() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED); + } + + @Override + public long getNumConsumingSegmentsMatched() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED); + } + + @Override + public long getMinConsumingFreshnessTimeMs() { + return _serverStats.getLong(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS); + } + + @Override + public long getTotalDocs() { + return _serverStats.getLong(DataTable.MetadataKey.TOTAL_DOCS); + } + + @Override + public boolean isNumGroupsLimitReached() { + return _serverStats.getBoolean(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED); + } + + public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) { + _serverStats.merge(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED, numGroupsLimitReached); + } + + @Override + public long getNumSegmentsPrunedByServer() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER); + } + + @Override + public long getNumSegmentsPrunedInvalid() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID); } - public void addStageStat(Integer stageId, BrokerResponseStats brokerResponseStats) { - // StageExecutionWallTime will always be there, other stats are optional such as OperatorStats - if (brokerResponseStats.getStageExecWallTimeMs() != -1) { - _stageIdStats.put(stageId, brokerResponseStats); + @Override + public long getNumSegmentsPrunedByLimit() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT); + } + + @Override + public long getNumSegmentsPrunedByValue() { + return _serverStats.getLong(DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE); + } + + @Override + public long getExplainPlanNumEmptyFilterSegments() { + return _serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS); + } + + @Override + public long getExplainPlanNumMatchAllFilterSegments() { + return _serverStats.getLong(DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS); + } + + @Override + public long getOfflineTotalCpuTimeNs() { + return getOfflineThreadCpuTimeNs() + getOfflineSystemActivitiesCpuTimeNs() + + getOfflineResponseSerializationCpuTimeNs(); + } + + @Override + public long getRealtimeTotalCpuTimeNs() { + return getRealtimeThreadCpuTimeNs() + getRealtimeSystemActivitiesCpuTimeNs() + + getRealtimeResponseSerializationCpuTimeNs(); + } + + @Override + public void setExceptions(List<ProcessingException> exceptions) { + for (ProcessingException exception : exceptions) { + _processingExceptions.add(new QueryProcessingException(exception.getErrorCode(), exception.getMessage())); } } - @JsonProperty("stageStats") - public Map<Integer, BrokerResponseStats> getStageIdStats() { - return _stageIdStats; + public void addToExceptions(QueryProcessingException processingException) { + _processingExceptions.add(processingException); + } + + @Override + public int getNumServersQueried() { + return _numServersQueried; + } + + @Override + public void setNumServersQueried(int numServersQueried) { + _numServersQueried = numServersQueried; + } + + @Override + public int getNumServersResponded() { + return _numServersResponded; + } + + @Override + public void setNumServersResponded(int numServersResponded) { + _numServersResponded = numServersResponded; + } + + @JsonProperty("maxRowsInJoinReached") + public boolean isMaxRowsInJoinReached() { + return _maxRowsInJoinReached; + } + + @JsonProperty("maxRowsInJoinReached") + public void mergeMaxRowsInJoinReached(boolean maxRowsInJoinReached) { + _maxRowsInJoinReached |= maxRowsInJoinReached; + } + + @Override + public int getExceptionsSize() { + return _processingExceptions.size(); + } + + @Override + public void setResultTable(ResultTable resultTable) { + _resultTable = resultTable; + } + + @Override + public ResultTable getResultTable() { + return _resultTable; + } + + @Override + public List<QueryProcessingException> getProcessingExceptions() { + return List.of(); + } + + @Override + public int getNumRowsResultSet() { + return 0; + } + + @Override + public long getOfflineThreadCpuTimeNs() { + return _offlineThreadCpuTimeNs; + } + + @Override + public long getRealtimeThreadCpuTimeNs() { + return _realtimeThreadCpuTimeNs; + } + + @Override + public long getOfflineSystemActivitiesCpuTimeNs() { + return _offlineSystemActivitiesCpuTimeNs; + } + + @Override + public long getRealtimeSystemActivitiesCpuTimeNs() { + return _realtimeSystemActivitiesCpuTimeNs; + } + + @Override + public long getOfflineResponseSerializationCpuTimeNs() { + return _offlineResponseSerializationCpuTimeNs; + } + + @Override + public long getRealtimeResponseSerializationCpuTimeNs() { + return _realtimeResponseSerializationCpuTimeNs; + } + + @Override + public long getNumSegmentsPrunedByBroker() { + return _numSegmentsPrunedByBroker; + } + + @Override + public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) { + _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker; + } + + @Override + public String getRequestId() { + return _requestId; + } + + @Override + public void setRequestId(String requestId) { + _requestId = requestId; + } + + @Override + public String getBrokerId() { + return _brokerId; + } + + @Override + public void setBrokerId(String requestId) { + _brokerId = requestId; + } + + @Override + public long getBrokerReduceTimeMs() { + return _brokerReduceTimeMs; + } + + @Override + public void setBrokerReduceTimeMs(long brokerReduceTimeMs) { + _brokerReduceTimeMs = brokerReduceTimeMs; + } + + @JsonProperty(access = JsonProperty.Access.READ_ONLY) + @Override + public boolean isPartialResult() { + return isNumGroupsLimitReached() || getExceptionsSize() > 0 || isMaxRowsInJoinReached(); + } + + public void addServerStats(StatMap<DataTable.MetadataKey> serverStats) { + // Set execution statistics. + _serverStats.merge(serverStats); + + long threadCpuTimeNs = serverStats.getLong(DataTable.MetadataKey.THREAD_CPU_TIME_NS); + long systemActivitiesCpuTimeNs = serverStats.getLong(DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS); + long responseSerializationCpuTimeNs = serverStats.getLong(DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS); + + String tableName = serverStats.getString(DataTable.MetadataKey.TABLE); + if (tableName != null) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); + + if (tableType == TableType.OFFLINE) { + _offlineThreadCpuTimeNs += threadCpuTimeNs; + _offlineSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs; + _offlineResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs; + } else { + _realtimeThreadCpuTimeNs += threadCpuTimeNs; + _realtimeSystemActivitiesCpuTimeNs += systemActivitiesCpuTimeNs; + _realtimeResponseSerializationCpuTimeNs += responseSerializationCpuTimeNs; + } Review Comment: These are not V1 or V2 stats. They _broker_ stats. They are populated in the broker code. V1 collects `threadCpuTimeNs`, `systemActivitiesCpuTimeNs` and `responseSerializationCpuTimeNs` but doesn't care whether they are realtime or offline. That information reaches the broker and depending on the table type, it stores the value in the realtime or offline version and that is what is returned to the customer. In the code we have in master right now these stats are not being populated when the query is executed in V2. Specifically in `ExecutionStatsAggregator`, the code: ```java TableType tableType = null; String instanceName = null; if (routingInstance != null) { tableType = routingInstance.getTableType(); instanceName = routingInstance.getShortName();; } else if (tableName != null) { tableType = TableNameBuilder.getTableTypeFromTableName(tableName); instanceName = tableName; } else { tableType = null; instanceName = null; } ``` Ends up setting `tableType` to null, these stats are never calculated. I guess we can remove these lines and keep the current behavior. We can try to add support for this in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org