This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a84fc7e313 Add `pinot_server_mse_queries` server metrics (#18287)
2a84fc7e313 is described below

commit 2a84fc7e313383bd324b9c225a1f313221b0a145
Author: Timothy Elgersma <[email protected]>
AuthorDate: Thu Apr 23 16:38:06 2026 -0400

    Add `pinot_server_mse_queries` server metrics (#18287)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  7 ++-
 .../pinot/query/service/server/QueryServer.java    |  2 +
 .../query/service/server/QueryServerTest.java      | 73 ++++++++++++++++++++++
 3 files changed, 81 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 800cd0c0054..2e40a399f98 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -272,7 +272,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   MSE_MEMORY_ALLOCATED_BYTES("bytes", true),
   /// Total number of rows emitted by multi-stage execution.
   /// This is equal to the sum of the emittedRows reported by the root of all 
the opchains executed in the server.
-  MSE_EMITTED_ROWS("rows", true);
+  MSE_EMITTED_ROWS("rows", true),
+
+  /// Number of MSE queries received by this server.
+  /// This metric is incremented once per query, even if the server is acting 
as a leaf, intermediate, or both.
+  MSE_QUERIES("queries", true,
+      "Number of MSE queries received by this server");
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 6ea144c9e04..51fa6ba6e42 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -234,6 +234,8 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   //   starts in the query runner.
   @Override
   public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryResponse> responseObserver) {
+    // Match the SSE QUERIES counter semantics by counting requests as soon as 
they reach the handler.
+    ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
     Map<String, String> reqMetadata;
     try {
       reqMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index bf1a4ddaf51..af84711fadf 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
 import io.grpc.Deadline;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +33,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.proto.Worker;
@@ -53,6 +56,7 @@ import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.query.QueryExecutionContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.LoggerConstants;
@@ -87,6 +91,8 @@ public class QueryServerTest extends QueryTestSet {
   @BeforeClass
   public void setUp()
       throws Exception {
+    ServerMetrics.deregister();
+    ServerMetrics.register(new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()));
     for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
       int availablePort = QueryTestUtils.getAvailablePort();
       QueryRunner queryRunner = mock(QueryRunner.class);
@@ -109,6 +115,7 @@ public class QueryServerTest extends QueryTestSet {
     for (QueryServer worker : _queryServerMap.values()) {
       worker.shutdown();
     }
+    ServerMetrics.deregister();
   }
 
   @AfterMethod
@@ -134,6 +141,72 @@ public class QueryServerTest extends QueryTestSet {
     assertTrue(errorMessage.contains("foo"), "Error message should contain 
'foo' but it is: " + errorMessage);
   }
 
+  @Test
+  public void testMseQueriesMetricIncrementedOnSuccessfulSubmit()
+      throws Exception {
+    long mseBefore = 
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count();
+    long grpcBefore = 
ServerMetrics.get().getMeteredValue(ServerMeter.GRPC_QUERIES).count();
+    DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM 
a");
+    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+    Map<String, String> requestMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+    Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
+    
assertTrue(resp.getMetadataMap().containsKey(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK));
+    TestUtils.waitForCondition(
+        aVoid -> 
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count() == 
mseBefore + 1,
+        5000L, "MSE_QUERIES was not incremented");
+    
assertEquals(ServerMetrics.get().getMeteredValue(ServerMeter.GRPC_QUERIES).count(),
 grpcBefore,
+        "GRPC_QUERIES should NOT be incremented by the MSE path");
+  }
+
+  @Test
+  public void testMseQueriesMetricIncrementedOnFailedSubmit()
+      throws Exception {
+    long mseBefore = 
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count();
+    DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM 
a");
+    Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+    Map<String, String> requestMetadata = 
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+    QueryRunner mockRunner = 
_queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
+    doThrow(new RuntimeException("foo")).when(mockRunner).processQuery(any(), 
any(), any());
+
+    Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
+    reset(mockRunner);
+
+    String errorMessage = 
resp.getMetadataMap().get(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR);
+    assertTrue(errorMessage.contains("foo"), "Error message should contain 
'foo' but it is: " + errorMessage);
+    TestUtils.waitForCondition(
+        aVoid -> 
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count() == 
mseBefore + 1,
+        5000L, "MSE_QUERIES was not incremented");
+  }
+
+  @Test
+  public void testMseQueriesMetricIncrementedBeforeMetadataDeserialization() {
+    long mseBefore = 
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count();
+    QueryServer queryServer = _queryServerMap.values().iterator().next();
+    Worker.QueryResponse[] responseHolder = new Worker.QueryResponse[1];
+
+    
queryServer.submit(Worker.QueryRequest.newBuilder().setMetadata(ByteString.copyFromUtf8("invalid")).build(),
+        new StreamObserver<>() {
+          @Override
+          public void onNext(Worker.QueryResponse value) {
+            responseHolder[0] = value;
+          }
+
+          @Override
+          public void onError(Throwable t) {
+          }
+
+          @Override
+          public void onCompleted() {
+          }
+        });
+
+    
assertEquals(ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count(),
 mseBefore + 1,
+        "MSE_QUERIES should be incremented before metadata deserialization");
+    assertTrue(responseHolder[0].getMetadataMap()
+            
.containsKey(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR),
+        "Malformed metadata should still return an error response");
+  }
+
   @Test(dataProvider = "testSql")
   public void testWorkerAcceptsWorkerRequestCorrect(String sql)
       throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to