This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 405ad466081 Surface per-server stats on BrokerResponse for SSE queries 
(#18602)
405ad466081 is described below

commit 405ad466081968f2037b8ab25e97bf592ea6f0d2
Author: st-omarkhalid <[email protected]>
AuthorDate: Thu May 28 10:52:11 2026 -0500

    Surface per-server stats on BrokerResponse for SSE queries (#18602)
    
    Per-server scatter stats (submit delay, response delay, response size,
    deserialization timing) are computed by the Netty single-connection
    transport during scatter and stashed on a local ServerStats holder
    inside BaseSingleStageBrokerRequestHandler.handleRequest. Today the
    formatted string only reaches the SLF4J QueryLogger that consumes the
    holder before handleRequest returns; subscribers of the
    onQueryCompletion(RequestContext, BrokerResponse) hook and JSON-API
    clients have no way to read it because neither RequestContext nor
    BrokerResponse exposes it.
    
    This commit exposes the string on BrokerResponse, mirroring the existing
    materializedViewQueried pattern: default getter on the BrokerResponse
    interface (returns null so external implementations need no override),
    concrete field + setter + Jackson-annotated getter on
    BrokerResponseNative, and @JsonInclude(NON_NULL) so existing clients
    that do not expect the field see no behavioral change.
    BaseSingleStageBrokerRequestHandler#handleRequest writes the value onto
    the response after scatter completes, both on the regular path and the
    materialized-view split path. Only the Netty SSE transport produces a
    non-null value today; gRPC SSE and MSE leave it null because OSS does
    not currently compute a per-server stats string on those paths.
    
    Tests in BrokerResponseNativeTest pin (1) JSON round-trip, (2)
    suppression of the field when null, and (3) the null default.
---
 .../BaseSingleStageBrokerRequestHandler.java       |  5 ++++
 .../response/broker/BrokerResponseNative.java      | 17 ++++++++++++-
 .../response/broker/BrokerResponseNativeTest.java  | 28 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index e343f89a49c..a0469a4bc25 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -1022,6 +1022,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
 
     brokerResponse.setRLSFiltersApplied(rlsFiltersApplied.get());
 
+    // Record per-server stats on the SSE BrokerResponse so downstream 
consumers can read it.
+    brokerResponse.setServerStats(serverStats.getServerStats());
+
     // Log query and stats
     _queryLogger.logQueryCompleted(
         new QueryLogger.QueryLogParams(requestContext, tableName, 
brokerResponse,
@@ -2397,6 +2400,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     viewSplitResponse.setTimeUsedMs(totalTimeMs);
     augmentStatistics(requestContext, viewSplitResponse);
     viewSplitResponse.setRLSFiltersApplied(rlsFiltersApplied);
+    // Record per-server stats on the SSE BrokerResponse so downstream 
consumers can read it.
+    viewSplitResponse.setServerStats(serverStats.getServerStats());
     _queryLogger.logQueryCompleted(
         new QueryLogger.QueryLogParams(requestContext, tableName, 
viewSplitResponse,
             QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE, 
requesterIdentity, serverStats),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 513dfdfce69..b0ab89deaf2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -58,7 +58,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
     "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
     "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", 
"offlineResponseSerMemAllocatedBytes",
     "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", 
"realtimeTotalMemAllocatedBytes",
-    "pools", "rlsFiltersApplied", "groupsTrimmed", "materializedViewQueried"
+    "pools", "rlsFiltersApplied", "groupsTrimmed", "materializedViewQueried", 
"serverStats"
 })
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class BrokerResponseNative implements BrokerResponse {
@@ -126,6 +126,9 @@ public class BrokerResponseNative implements BrokerResponse 
{
   @Nullable
   private String _materializedViewQueried;
 
+  @Nullable
+  private String _serverStats;
+
   public BrokerResponseNative() {
   }
 
@@ -649,4 +652,16 @@ public class BrokerResponseNative implements 
BrokerResponse {
   public String getMaterializedViewQueried() {
     return _materializedViewQueried;
   }
+
+  @JsonProperty("serverStats")
+  public void setServerStats(@Nullable String serverStats) {
+    _serverStats = serverStats;
+  }
+
+  @Nullable
+  @JsonProperty("serverStats")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public String getServerStats() {
+    return _serverStats;
+  }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
index e25008636bf..5a5f3f2cc5d 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.common.response.broker;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.testng.Assert;
@@ -85,4 +87,30 @@ public class BrokerResponseNativeTest {
     Assert.assertFalse(json.contains("materializedViewQueried"),
         "Null materializedViewQueried should be suppressed by 
@JsonInclude(NON_NULL)");
   }
+
+  @Test
+  public void testServerStatsRoundTrip()
+      throws IOException {
+    BrokerResponseNative expected = new BrokerResponseNative();
+    String stats =
+        
"Server=SubmitDelayMs,ResponseDelayMs,ResponseSize,DeserializationTimeMs,RequestSentDelayMs;"
+            + "pinot-server-0_O=0,1,7571,0,0;pinot-server-1_O=0,1,7574,0,0";
+    expected.setServerStats(stats);
+
+    BrokerResponseNative actual = 
BrokerResponseNative.fromJsonString(expected.toJsonString());
+    Assert.assertEquals(actual.getServerStats(), stats);
+  }
+
+  @Test
+  public void testServerStatsAbsentWhenNull()
+      throws IOException {
+    BrokerResponseNative response = new BrokerResponseNative();
+    JsonNode tree = new ObjectMapper().readTree(response.toJsonString());
+    Assert.assertFalse(tree.has("serverStats"));
+  }
+
+  @Test
+  public void testServerStatsDefaultsToNull() {
+    Assert.assertNull(new BrokerResponseNative().getServerStats());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to