This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0f92742b45 Multi stage metrics (#13035) 0f92742b45 is described below commit 0f92742b45aa9ac57439b68a63e4a9ef8d9d9d8e Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Fri Jun 7 09:37:34 2024 +0200 Multi stage metrics (#13035) --- .../apache/pinot/common/metrics/ServerMeter.java | 41 ++++++++++++- .../apache/pinot/common/metrics/ServerTimer.java | 31 +++++++++- .../runtime/operator/MailboxSendOperator.java | 23 ++++++++ .../query/runtime/operator/MultiStageOperator.java | 69 ++++++++++++++++++++++ .../query/runtime/plan/MultiStageQueryStats.java | 24 ++++---- 5 files changed, 176 insertions(+), 12 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 516584950d..49d1c0c7e9 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -118,7 +118,46 @@ public enum ServerMeter implements AbstractMetrics.Meter { LARGE_QUERY_RESPONSES_SENT("largeResponses", false), TOTAL_THREAD_CPU_TIME_MILLIS("millis", false), LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false), - STREAM_DATA_LOSS("streamDataLoss", false); + STREAM_DATA_LOSS("streamDataLoss", false), + + // Multi-stage + /** + * Number of times the max number of rows in the hash table has been reached. + * It is increased at most one by one each time per stage. + * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 1. + * But if a single query has 2 different join operators and each one reaches the limit, this will be increased by 2. + */ + HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true), + /** + * Number of times the max number of groups has been reached. + * It is increased at most one by one each time per stage. + * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 1. + * But if a single query has 2 different aggregate operators and each one reaches the limit, this will be increased + * by 2. + */ + AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED("times", true), + /** + * The number of blocks that have been sent to the next stage without being serialized. + * This is the sum of all blocks sent by all workers in the stage. + */ + MULTI_STAGE_IN_MEMORY_MESSAGES("messages", true), + /** + * The number of blocks that have been sent to the next stage in serialized format. + * This is the sum of all blocks sent by all workers in the stage. + */ + MULTI_STAGE_RAW_MESSAGES("messages", true), + /** + * The number of bytes that have been sent to the next stage in serialized format. + * This is the sum of all bytes sent by all workers in the stage. + */ + MULTI_STAGE_RAW_BYTES("bytes", true), + /** + * Number of times the max number of rows in window has been reached. + * It is increased at most one by one each time per stage. + * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 1. + * But if a single query has 2 different window operators and each one reaches the limit, this will be increased by 2. + */ + WINDOW_TIMES_MAX_ROWS_REACHED("times", true); private final String _meterName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java index 31ee69428e..79e1eff8e0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java @@ -54,7 +54,36 @@ public enum ServerTimer implements AbstractMetrics.Timer { UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false, "Total time taken to delete expired primary keys based on metadataTTL or deletedKeysTTL"), GRPC_QUERY_EXECUTION_MS("milliseconds", false, "Total execution time of a successful query over gRPC"), - UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take upsert table snapshot"); + UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take upsert table snapshot"), + + // Multi-stage + /** + * Time spent building the hash table for the join. + * This is the sum of all time spent by all workers in the stage. + */ + HASH_JOIN_BUILD_TABLE_CPU_TIME_MS("millis", true), + /** + * Time spent serializing blocks into bytes to be sent to the next stage. + * This is the sum of all time spent by all workers in the stage. + */ + MULTI_STAGE_SERIALIZATION_CPU_TIME_MS("millis", true), + /** + * Time spent deserializing bytes into blocks to be processed by the stage. + * This is the sum of all time spent by all workers in the stage. + */ + MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS("millis", true), + /** + * Time waiting on the receive mailbox for its parent operator to consume the data. + * Remember that each stage may have several workers and each one will have a receive mailbox for each worker it is + * reading from. This is the sum of all time waiting. + */ + RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS("millis", true), + /** + * Time waiting on the receive mailbox waiting for the child operator to produce the data. + * Remember that each stage may have several workers and each one will have a receive mailbox for each worker it is + * reading from. This is the sum of all time waiting. + */ + RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS("millis", true); private final String _timerName; private final boolean _global; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index fce214e7aa..1d638585ed 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelFieldCollation; import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; @@ -146,6 +147,11 @@ public class MailboxSendOperator extends MultiStageOperator { updateEosBlock(block, _statMap); // no need to check early terminate signal b/c the current block is already EOS sendTransferableBlock(block); + // After sending its own stats, the sending operator of the stage 1 has the complete view of all stats + // Therefore this is the only place we can update some of the metrics like total seen rows or time spent. + if (_context.getStageId() == 1) { + updateMetrics(block); + } } else { if (sendTransferableBlock(block)) { earlyTerminate(); @@ -196,6 +202,23 @@ public class MailboxSendOperator extends MultiStageOperator { _exchange.cancel(t); } + private void updateMetrics(TransferableBlock block) { + ServerMetrics serverMetrics = ServerMetrics.get(); + MultiStageQueryStats queryStats = block.getQueryStats(); + if (queryStats == null) { + LOGGER.info("Query stats not found in the EOS block."); + } else { + for (MultiStageQueryStats.StageStats.Closed closed : queryStats.getClosedStats()) { + closed.forEach((type, stats) -> { + type.updateServerMetrics(stats, serverMetrics); + }); + } + queryStats.getCurrentStats().forEach((type, stats) -> { + type.updateServerMetrics(stats, serverMetrics); + }); + } + } + public enum StatKey implements StatMap.Key { EXECUTION_TIME_MS(StatMap.Type.LONG) { @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 6d7ea779cc..50e68a47a6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -24,6 +24,9 @@ import com.google.common.base.Stopwatch; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -174,6 +177,17 @@ public abstract class MultiStageOperator response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED)); response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS)); } + + @Override + public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { + super.updateServerMetrics(map, serverMetrics); + @SuppressWarnings("unchecked") + StatMap<AggregateOperator.StatKey> stats = (StatMap<AggregateOperator.StatKey>) map; + boolean limitReached = stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED); + if (limitReached) { + serverMetrics.addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED, 1); + } + } }, FILTER(FilterOperator.StatKey.class) { @Override @@ -191,6 +205,19 @@ public abstract class MultiStageOperator response.mergeMaxRowsInOperator(stats.getLong(HashJoinOperator.StatKey.EMITTED_ROWS)); response.mergeMaxRowsInJoinReached(stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED)); } + + @Override + public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { + super.updateServerMetrics(map, serverMetrics); + @SuppressWarnings("unchecked") + StatMap<HashJoinOperator.StatKey> stats = (StatMap<HashJoinOperator.StatKey>) map; + boolean maxRowsInJoinReached = stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED); + if (maxRowsInJoinReached) { + serverMetrics.addMeteredGlobalValue(ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED, 1); + } + serverMetrics.addTimedValue(ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS, + stats.getLong(HashJoinOperator.StatKey.TIME_BUILDING_HASH_TABLE_MS), TimeUnit.MILLISECONDS); + } }, INTERSECT(SetOperator.StatKey.class) { @Override @@ -228,6 +255,27 @@ public abstract class MultiStageOperator StatMap<BaseMailboxReceiveOperator.StatKey> stats = (StatMap<BaseMailboxReceiveOperator.StatKey>) map; response.mergeMaxRowsInOperator(stats.getLong(BaseMailboxReceiveOperator.StatKey.EMITTED_ROWS)); } + + @Override + public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { + super.updateServerMetrics(map, serverMetrics); + @SuppressWarnings("unchecked") + StatMap<BaseMailboxReceiveOperator.StatKey> stats = (StatMap<BaseMailboxReceiveOperator.StatKey>) map; + + serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES, + stats.getInt(BaseMailboxReceiveOperator.StatKey.IN_MEMORY_MESSAGES)); + serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_MESSAGES, + stats.getInt(BaseMailboxReceiveOperator.StatKey.RAW_MESSAGES)); + serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_BYTES, + stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZED_BYTES)); + + serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS, + stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZATION_TIME_MS), TimeUnit.MILLISECONDS); + serverMetrics.addTimedValue(ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS, + stats.getLong(BaseMailboxReceiveOperator.StatKey.DOWNSTREAM_WAIT_MS), TimeUnit.MILLISECONDS); + serverMetrics.addTimedValue(ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS, + stats.getLong(BaseMailboxReceiveOperator.StatKey.UPSTREAM_WAIT_MS), TimeUnit.MILLISECONDS); + } }, MAILBOX_SEND(MailboxSendOperator.StatKey.class) { @Override @@ -236,6 +284,14 @@ public abstract class MultiStageOperator StatMap<MailboxSendOperator.StatKey> stats = (StatMap<MailboxSendOperator.StatKey>) map; response.mergeMaxRowsInOperator(stats.getLong(MailboxSendOperator.StatKey.EMITTED_ROWS)); } + + @Override + public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { + @SuppressWarnings("unchecked") + StatMap<MailboxSendOperator.StatKey> stats = (StatMap<MailboxSendOperator.StatKey>) map; + serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS, + stats.getLong(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS), TimeUnit.MILLISECONDS); + } }, MINUS(SetOperator.StatKey.class) { @Override @@ -286,6 +342,15 @@ public abstract class MultiStageOperator response.mergeMaxRowsInWindowReached( stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED)); } + + @Override + public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { + @SuppressWarnings("unchecked") + StatMap<WindowAggregateOperator.StatKey> stats = (StatMap<WindowAggregateOperator.StatKey>) map; + if (stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED)) { + serverMetrics.addMeteredGlobalValue(ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED, 1); + } + } },; private final Class _statKeyClass; @@ -311,5 +376,9 @@ public abstract class MultiStageOperator * (compatible with {@link #getStatKeyClass()}). This is a way to avoid casting in the caller. */ public abstract void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map); + + public void updateServerMetrics(StatMap<?> map, ServerMetrics serverMetrics) { + // Do nothing by default + } } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java index 712a0ab43a..b87b2a4085 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java @@ -284,14 +284,18 @@ public class MultiStageQueryStats { myStats.merge(dis); } } catch (IOException ex) { - LOGGER.warn("Error deserializing stats on stage {}. Considering the new stats empty", i, ex); + LOGGER.warn("Error deserializing stats on stage " + i + ". Considering the new stats empty", ex); } catch (IllegalArgumentException | IllegalStateException ex) { - LOGGER.warn("Error merging stats on stage {}. Ignoring the new stats", i, ex); + LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the new stats", ex); } } } } + public List<StageStats.Closed> getClosedStats() { + return Collections.unmodifiableList(_closedStats); + } + public JsonNode asJson() { ObjectNode node = JsonUtils.newObjectNode(); node.put("stage", _currentStageId); @@ -418,6 +422,14 @@ public class MultiStageQueryStats { return _operatorStats.size() - 1; } + public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>> consumer) { + Iterator<MultiStageOperator.Type> typeIterator = _operatorTypes.iterator(); + Iterator<StatMap<?>> statIterator = _operatorStats.iterator(); + while (typeIterator.hasNext()) { + consumer.accept(typeIterator.next(), statIterator.next()); + } + } + public JsonNode asJson() { ArrayNode json = JsonUtils.newArrayNode(); @@ -539,14 +551,6 @@ public class MultiStageQueryStats { throws IOException { return deserialize(input, input.readInt()); } - - public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>> consumer) { - Iterator<MultiStageOperator.Type> typeIterator = _operatorTypes.iterator(); - Iterator<StatMap<?>> statIterator = _operatorStats.iterator(); - while (typeIterator.hasNext()) { - consumer.accept(typeIterator.next(), statIterator.next()); - } - } } public static Closed deserialize(DataInput input, int numOperators) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org