This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a84fc7e313 Add `pinot_server_mse_queries` server metrics (#18287)
2a84fc7e313 is described below
commit 2a84fc7e313383bd324b9c225a1f313221b0a145
Author: Timothy Elgersma <[email protected]>
AuthorDate: Thu Apr 23 16:38:06 2026 -0400
Add `pinot_server_mse_queries` server metrics (#18287)
---
.../apache/pinot/common/metrics/ServerMeter.java | 7 ++-
.../pinot/query/service/server/QueryServer.java | 2 +
.../query/service/server/QueryServerTest.java | 73 ++++++++++++++++++++++
3 files changed, 81 insertions(+), 1 deletion(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 800cd0c0054..2e40a399f98 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -272,7 +272,12 @@ public enum ServerMeter implements AbstractMetrics.Meter {
MSE_MEMORY_ALLOCATED_BYTES("bytes", true),
/// Total number of rows emitted by multi-stage execution.
/// This is equal to the sum of the emittedRows reported by the root of all
the opchains executed in the server.
- MSE_EMITTED_ROWS("rows", true);
+ MSE_EMITTED_ROWS("rows", true),
+
+ /// Number of MSE queries received by this server.
+ /// This metric is incremented once per query, even if the server is acting
as a leaf, intermediate, or both.
+ MSE_QUERIES("queries", true,
+ "Number of MSE queries received by this server");
private final String _meterName;
private final String _unit;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 6ea144c9e04..51fa6ba6e42 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -234,6 +234,8 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
// starts in the query runner.
@Override
public void submit(Worker.QueryRequest request,
StreamObserver<Worker.QueryResponse> responseObserver) {
+ // Match the SSE QUERIES counter semantics by counting requests as soon as
they reach the handler.
+ ServerMetrics.get().addMeteredGlobalValue(ServerMeter.MSE_QUERIES, 1L);
Map<String, String> reqMetadata;
try {
reqMetadata =
QueryPlanSerDeUtils.fromProtoProperties(request.getMetadata());
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index bf1a4ddaf51..af84711fadf 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
@@ -53,6 +56,7 @@ import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.accounting.ThreadAccountantUtils;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.LoggerConstants;
@@ -87,6 +91,8 @@ public class QueryServerTest extends QueryTestSet {
@BeforeClass
public void setUp()
throws Exception {
+ ServerMetrics.deregister();
+ ServerMetrics.register(new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()));
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = mock(QueryRunner.class);
@@ -109,6 +115,7 @@ public class QueryServerTest extends QueryTestSet {
for (QueryServer worker : _queryServerMap.values()) {
worker.shutdown();
}
+ ServerMetrics.deregister();
}
@AfterMethod
@@ -134,6 +141,72 @@ public class QueryServerTest extends QueryTestSet {
assertTrue(errorMessage.contains("foo"), "Error message should contain
'foo' but it is: " + errorMessage);
}
+ @Test
+ public void testMseQueriesMetricIncrementedOnSuccessfulSubmit()
+ throws Exception {
+ long mseBefore =
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count();
+ long grpcBefore =
ServerMetrics.get().getMeteredValue(ServerMeter.GRPC_QUERIES).count();
+ DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM
a");
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+ Map<String, String> requestMetadata =
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+ Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
+
assertTrue(resp.getMetadataMap().containsKey(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK));
+ TestUtils.waitForCondition(
+ aVoid ->
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count() ==
mseBefore + 1,
+ 5000L, "MSE_QUERIES was not incremented");
+
assertEquals(ServerMetrics.get().getMeteredValue(ServerMeter.GRPC_QUERIES).count(),
grpcBefore,
+ "GRPC_QUERIES should NOT be incremented by the MSE path");
+ }
+
+ @Test
+ public void testMseQueriesMetricIncrementedOnFailedSubmit()
+ throws Exception {
+ long mseBefore =
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count();
+ DispatchableSubPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM
a");
+ Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, 1);
+ Map<String, String> requestMetadata =
QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
+ QueryRunner mockRunner =
_queryRunnerMap.get(Integer.parseInt(requestMetadata.get(KEY_OF_SERVER_INSTANCE_PORT)));
+ doThrow(new RuntimeException("foo")).when(mockRunner).processQuery(any(),
any(), any());
+
+ Worker.QueryResponse resp = submitRequest(queryRequest, requestMetadata);
+ reset(mockRunner);
+
+ String errorMessage =
resp.getMetadataMap().get(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR);
+ assertTrue(errorMessage.contains("foo"), "Error message should contain
'foo' but it is: " + errorMessage);
+ TestUtils.waitForCondition(
+ aVoid ->
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count() ==
mseBefore + 1,
+ 5000L, "MSE_QUERIES was not incremented");
+ }
+
+ @Test
+ public void testMseQueriesMetricIncrementedBeforeMetadataDeserialization() {
+ long mseBefore =
ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count();
+ QueryServer queryServer = _queryServerMap.values().iterator().next();
+ Worker.QueryResponse[] responseHolder = new Worker.QueryResponse[1];
+
+
queryServer.submit(Worker.QueryRequest.newBuilder().setMetadata(ByteString.copyFromUtf8("invalid")).build(),
+ new StreamObserver<>() {
+ @Override
+ public void onNext(Worker.QueryResponse value) {
+ responseHolder[0] = value;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
+
+
assertEquals(ServerMetrics.get().getMeteredValue(ServerMeter.MSE_QUERIES).count(),
mseBefore + 1,
+ "MSE_QUERIES should be incremented before metadata deserialization");
+ assertTrue(responseHolder[0].getMetadataMap()
+
.containsKey(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR),
+ "Malformed metadata should still return an error response");
+ }
+
@Test(dataProvider = "testSql")
public void testWorkerAcceptsWorkerRequestCorrect(String sql)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]