This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f92742b45 Multi stage metrics (#13035)
0f92742b45 is described below

commit 0f92742b45aa9ac57439b68a63e4a9ef8d9d9d8e
Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com>
AuthorDate: Fri Jun 7 09:37:34 2024 +0200

    Multi stage metrics (#13035)
---
 .../apache/pinot/common/metrics/ServerMeter.java   | 41 ++++++++++++-
 .../apache/pinot/common/metrics/ServerTimer.java   | 31 +++++++++-
 .../runtime/operator/MailboxSendOperator.java      | 23 ++++++++
 .../query/runtime/operator/MultiStageOperator.java | 69 ++++++++++++++++++++++
 .../query/runtime/plan/MultiStageQueryStats.java   | 24 ++++----
 5 files changed, 176 insertions(+), 12 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 516584950d..49d1c0c7e9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -118,7 +118,46 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
   TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
   LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
-  STREAM_DATA_LOSS("streamDataLoss", false);
+  STREAM_DATA_LOSS("streamDataLoss", false),
+
+  // Multi-stage
+  /**
+   * Number of times the max number of rows in the hash table has been reached.
+   * It is increased at most one by one each time per stage.
+   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 1.
+   * But if a single query has 2 different join operators and each one reaches 
the limit, this will be increased by 2.
+   */
+  HASH_JOIN_TIMES_MAX_ROWS_REACHED("times", true),
+  /**
+   * Number of times the max number of groups has been reached.
+   * It is increased at most one by one each time per stage.
+   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 1.
+   * But if a single query has 2 different aggregate operators and each one 
reaches the limit, this will be increased
+   * by 2.
+   */
+  AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED("times", true),
+  /**
+   * The number of blocks that have been sent to the next stage without being 
serialized.
+   * This is the sum of all blocks sent by all workers in the stage.
+   */
+  MULTI_STAGE_IN_MEMORY_MESSAGES("messages", true),
+  /**
+   * The number of blocks that have been sent to the next stage in serialized 
format.
+   * This is the sum of all blocks sent by all workers in the stage.
+   */
+  MULTI_STAGE_RAW_MESSAGES("messages", true),
+  /**
+   * The number of bytes that have been sent to the next stage in serialized 
format.
+   * This is the sum of all bytes sent by all workers in the stage.
+   */
+  MULTI_STAGE_RAW_BYTES("bytes", true),
+  /**
+   * Number of times the max number of rows in window has been reached.
+   * It is increased at most one by one each time per stage.
+   * That means that if a stage has 10 workers and all of them reach the 
limit, this will be increased by 1.
+   * But if a single query has 2 different window operators and each one 
reaches the limit, this will be increased by 2.
+   */
+  WINDOW_TIMES_MAX_ROWS_REACHED("times", true);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 31ee69428e..79e1eff8e0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -54,7 +54,36 @@ public enum ServerTimer implements AbstractMetrics.Timer {
   UPSERT_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
       "Total time taken to delete expired primary keys based on metadataTTL or 
deletedKeysTTL"),
   GRPC_QUERY_EXECUTION_MS("milliseconds", false, "Total execution time of a 
successful query over gRPC"),
-  UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take 
upsert table snapshot");
+  UPSERT_SNAPSHOT_TIME_MS("milliseconds", false, "Total time taken to take 
upsert table snapshot"),
+
+  // Multi-stage
+  /**
+   * Time spent building the hash table for the join.
+   * This is the sum of all time spent by all workers in the stage.
+   */
+  HASH_JOIN_BUILD_TABLE_CPU_TIME_MS("millis", true),
+  /**
+   * Time spent serializing blocks into bytes to be sent to the next stage.
+   * This is the sum of all time spent by all workers in the stage.
+   */
+  MULTI_STAGE_SERIALIZATION_CPU_TIME_MS("millis", true),
+  /**
+   * Time spent deserializing bytes into blocks to be processed by the stage.
+   * This is the sum of all time spent by all workers in the stage.
+   */
+  MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS("millis", true),
+  /**
+   * Time waiting on the receive mailbox for its parent operator to consume 
the data.
+   * Remember that each stage may have several workers and each one will have 
a receive mailbox for each worker it is
+   * reading from. This is the sum of all time waiting.
+   */
+  RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS("millis", true),
+  /**
+   * Time waiting on the receive mailbox waiting for the child operator to 
produce the data.
+   * Remember that each stage may have several workers and each one will have 
a receive mailbox for each worker it is
+   * reading from. This is the sum of all time waiting.
+   */
+  RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS("millis", true);
 
   private final String _timerName;
   private final boolean _global;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index fce214e7aa..1d638585ed 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -146,6 +147,11 @@ public class MailboxSendOperator extends 
MultiStageOperator {
         updateEosBlock(block, _statMap);
         // no need to check early terminate signal b/c the current block is 
already EOS
         sendTransferableBlock(block);
+        // After sending its own stats, the sending operator of the stage 1 
has the complete view of all stats
+        // Therefore this is the only place we can update some of the metrics 
like total seen rows or time spent.
+        if (_context.getStageId() == 1) {
+          updateMetrics(block);
+        }
       } else {
         if (sendTransferableBlock(block)) {
           earlyTerminate();
@@ -196,6 +202,23 @@ public class MailboxSendOperator extends 
MultiStageOperator {
     _exchange.cancel(t);
   }
 
+  private void updateMetrics(TransferableBlock block) {
+    ServerMetrics serverMetrics = ServerMetrics.get();
+    MultiStageQueryStats queryStats = block.getQueryStats();
+    if (queryStats == null) {
+      LOGGER.info("Query stats not found in the EOS block.");
+    } else {
+      for (MultiStageQueryStats.StageStats.Closed closed : 
queryStats.getClosedStats()) {
+        closed.forEach((type, stats) -> {
+          type.updateServerMetrics(stats, serverMetrics);
+        });
+      }
+      queryStats.getCurrentStats().forEach((type, stats) -> {
+        type.updateServerMetrics(stats, serverMetrics);
+      });
+    }
+  }
+
   public enum StatKey implements StatMap.Key {
     EXECUTION_TIME_MS(StatMap.Type.LONG) {
       @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 6d7ea779cc..50e68a47a6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -24,6 +24,9 @@ import com.google.common.base.Stopwatch;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -174,6 +177,17 @@ public abstract class MultiStageOperator
         
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
         
response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS));
       }
+
+      @Override
+      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+        super.updateServerMetrics(map, serverMetrics);
+        @SuppressWarnings("unchecked")
+        StatMap<AggregateOperator.StatKey> stats = 
(StatMap<AggregateOperator.StatKey>) map;
+        boolean limitReached = 
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED);
+        if (limitReached) {
+          
serverMetrics.addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_NUM_GROUPS_LIMIT_REACHED,
 1);
+        }
+      }
     },
     FILTER(FilterOperator.StatKey.class) {
       @Override
@@ -191,6 +205,19 @@ public abstract class MultiStageOperator
         
response.mergeMaxRowsInOperator(stats.getLong(HashJoinOperator.StatKey.EMITTED_ROWS));
         
response.mergeMaxRowsInJoinReached(stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED));
       }
+
+      @Override
+      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+        super.updateServerMetrics(map, serverMetrics);
+        @SuppressWarnings("unchecked")
+        StatMap<HashJoinOperator.StatKey> stats = 
(StatMap<HashJoinOperator.StatKey>) map;
+        boolean maxRowsInJoinReached = 
stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED);
+        if (maxRowsInJoinReached) {
+          
serverMetrics.addMeteredGlobalValue(ServerMeter.HASH_JOIN_TIMES_MAX_ROWS_REACHED,
 1);
+        }
+        
serverMetrics.addTimedValue(ServerTimer.HASH_JOIN_BUILD_TABLE_CPU_TIME_MS,
+            
stats.getLong(HashJoinOperator.StatKey.TIME_BUILDING_HASH_TABLE_MS), 
TimeUnit.MILLISECONDS);
+      }
     },
     INTERSECT(SetOperator.StatKey.class) {
       @Override
@@ -228,6 +255,27 @@ public abstract class MultiStageOperator
         StatMap<BaseMailboxReceiveOperator.StatKey> stats = 
(StatMap<BaseMailboxReceiveOperator.StatKey>) map;
         
response.mergeMaxRowsInOperator(stats.getLong(BaseMailboxReceiveOperator.StatKey.EMITTED_ROWS));
       }
+
+      @Override
+      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+        super.updateServerMetrics(map, serverMetrics);
+        @SuppressWarnings("unchecked")
+        StatMap<BaseMailboxReceiveOperator.StatKey> stats = 
(StatMap<BaseMailboxReceiveOperator.StatKey>) map;
+
+        
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_IN_MEMORY_MESSAGES,
+            
stats.getInt(BaseMailboxReceiveOperator.StatKey.IN_MEMORY_MESSAGES));
+        
serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_MESSAGES,
+            stats.getInt(BaseMailboxReceiveOperator.StatKey.RAW_MESSAGES));
+        serverMetrics.addMeteredGlobalValue(ServerMeter.MULTI_STAGE_RAW_BYTES,
+            
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZED_BYTES));
+
+        
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_DESERIALIZATION_CPU_TIME_MS,
+            
stats.getLong(BaseMailboxReceiveOperator.StatKey.DESERIALIZATION_TIME_MS), 
TimeUnit.MILLISECONDS);
+        
serverMetrics.addTimedValue(ServerTimer.RECEIVE_DOWNSTREAM_WAIT_CPU_TIME_MS,
+            
stats.getLong(BaseMailboxReceiveOperator.StatKey.DOWNSTREAM_WAIT_MS), 
TimeUnit.MILLISECONDS);
+        
serverMetrics.addTimedValue(ServerTimer.RECEIVE_UPSTREAM_WAIT_CPU_TIME_MS,
+            
stats.getLong(BaseMailboxReceiveOperator.StatKey.UPSTREAM_WAIT_MS), 
TimeUnit.MILLISECONDS);
+      }
     },
     MAILBOX_SEND(MailboxSendOperator.StatKey.class) {
       @Override
@@ -236,6 +284,14 @@ public abstract class MultiStageOperator
         StatMap<MailboxSendOperator.StatKey> stats = 
(StatMap<MailboxSendOperator.StatKey>) map;
         
response.mergeMaxRowsInOperator(stats.getLong(MailboxSendOperator.StatKey.EMITTED_ROWS));
       }
+
+      @Override
+      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+        @SuppressWarnings("unchecked")
+        StatMap<MailboxSendOperator.StatKey> stats = 
(StatMap<MailboxSendOperator.StatKey>) map;
+        
serverMetrics.addTimedValue(ServerTimer.MULTI_STAGE_SERIALIZATION_CPU_TIME_MS,
+            stats.getLong(MailboxSendOperator.StatKey.SERIALIZATION_TIME_MS), 
TimeUnit.MILLISECONDS);
+      }
     },
     MINUS(SetOperator.StatKey.class) {
       @Override
@@ -286,6 +342,15 @@ public abstract class MultiStageOperator
         response.mergeMaxRowsInWindowReached(
             
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
       }
+
+      @Override
+      public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+        @SuppressWarnings("unchecked")
+        StatMap<WindowAggregateOperator.StatKey> stats = 
(StatMap<WindowAggregateOperator.StatKey>) map;
+        if 
(stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED)) {
+          
serverMetrics.addMeteredGlobalValue(ServerMeter.WINDOW_TIMES_MAX_ROWS_REACHED, 
1);
+        }
+      }
     },;
 
     private final Class _statKeyClass;
@@ -311,5 +376,9 @@ public abstract class MultiStageOperator
      * (compatible with {@link #getStatKeyClass()}). This is a way to avoid 
casting in the caller.
      */
     public abstract void mergeInto(BrokerResponseNativeV2 response, StatMap<?> 
map);
+
+    public void updateServerMetrics(StatMap<?> map, ServerMetrics 
serverMetrics) {
+      // Do nothing by default
+    }
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
index 712a0ab43a..b87b2a4085 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java
@@ -284,14 +284,18 @@ public class MultiStageQueryStats {
             myStats.merge(dis);
           }
         } catch (IOException ex) {
-          LOGGER.warn("Error deserializing stats on stage {}. Considering the 
new stats empty", i, ex);
+          LOGGER.warn("Error deserializing stats on stage " + i + ". 
Considering the new stats empty", ex);
         } catch (IllegalArgumentException | IllegalStateException ex) {
-          LOGGER.warn("Error merging stats on stage {}. Ignoring the new 
stats", i, ex);
+          LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the 
new stats", ex);
         }
       }
     }
   }
 
+  public List<StageStats.Closed> getClosedStats() {
+    return Collections.unmodifiableList(_closedStats);
+  }
+
   public JsonNode asJson() {
     ObjectNode node = JsonUtils.newObjectNode();
     node.put("stage", _currentStageId);
@@ -418,6 +422,14 @@ public class MultiStageQueryStats {
       return _operatorStats.size() - 1;
     }
 
+    public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>> 
consumer) {
+      Iterator<MultiStageOperator.Type> typeIterator = 
_operatorTypes.iterator();
+      Iterator<StatMap<?>> statIterator = _operatorStats.iterator();
+      while (typeIterator.hasNext()) {
+        consumer.accept(typeIterator.next(), statIterator.next());
+      }
+    }
+
     public JsonNode asJson() {
       ArrayNode json = JsonUtils.newArrayNode();
 
@@ -539,14 +551,6 @@ public class MultiStageQueryStats {
           throws IOException {
         return deserialize(input, input.readInt());
       }
-
-      public void forEach(BiConsumer<MultiStageOperator.Type, StatMap<?>> 
consumer) {
-        Iterator<MultiStageOperator.Type> typeIterator = 
_operatorTypes.iterator();
-        Iterator<StatMap<?>> statIterator = _operatorStats.iterator();
-        while (typeIterator.hasNext()) {
-          consumer.accept(typeIterator.next(), statIterator.next());
-        }
-      }
     }
 
     public static Closed deserialize(DataInput input, int numOperators)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to