This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 405ad466081 Surface per-server stats on BrokerResponse for SSE queries
(#18602)
405ad466081 is described below
commit 405ad466081968f2037b8ab25e97bf592ea6f0d2
Author: st-omarkhalid <[email protected]>
AuthorDate: Thu May 28 10:52:11 2026 -0500
Surface per-server stats on BrokerResponse for SSE queries (#18602)
Per-server scatter stats (submit delay, response delay, response size,
deserialization timing) are computed by the Netty single-connection
transport during scatter and stashed on a local ServerStats holder
inside BaseSingleStageBrokerRequestHandler.handleRequest. Today the
formatted string only reaches the SLF4J QueryLogger that consumes the
holder before handleRequest returns; subscribers of the
onQueryCompletion(RequestContext, BrokerResponse) hook and JSON-API
clients have no way to read it because neither RequestContext nor
BrokerResponse exposes it.
This commit exposes the string on BrokerResponse, mirroring the existing
materializedViewQueried pattern: default getter on the BrokerResponse
interface (returns null so external implementations need no override),
concrete field + setter + Jackson-annotated getter on
BrokerResponseNative, and @JsonInclude(NON_NULL) so existing clients
that do not expect the field see no behavioral change.
BaseSingleStageBrokerRequestHandler#handleRequest writes the value onto
the response after scatter completes, both on the regular path and the
materialized-view split path. Only the Netty SSE transport produces a
non-null value today; gRPC SSE and MSE leave it null because OSS does
not currently compute a per-server stats string on those paths.
Tests in BrokerResponseNativeTest pin (1) JSON round-trip, (2)
suppression of the field when null, and (3) the null default.
---
.../BaseSingleStageBrokerRequestHandler.java | 5 ++++
.../response/broker/BrokerResponseNative.java | 17 ++++++++++++-
.../response/broker/BrokerResponseNativeTest.java | 28 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index e343f89a49c..a0469a4bc25 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -1022,6 +1022,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
brokerResponse.setRLSFiltersApplied(rlsFiltersApplied.get());
+ // Record per-server stats on the SSE BrokerResponse so downstream
consumers can read it.
+ brokerResponse.setServerStats(serverStats.getServerStats());
+
// Log query and stats
_queryLogger.logQueryCompleted(
new QueryLogger.QueryLogParams(requestContext, tableName,
brokerResponse,
@@ -2397,6 +2400,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
viewSplitResponse.setTimeUsedMs(totalTimeMs);
augmentStatistics(requestContext, viewSplitResponse);
viewSplitResponse.setRLSFiltersApplied(rlsFiltersApplied);
+ // Record per-server stats on the SSE BrokerResponse so downstream
consumers can read it.
+ viewSplitResponse.setServerStats(serverStats.getServerStats());
_queryLogger.logQueryCompleted(
new QueryLogger.QueryLogParams(requestContext, tableName,
viewSplitResponse,
QueryLogger.QueryLogParams.QueryEngine.SINGLE_STAGE,
requesterIdentity, serverStats),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 513dfdfce69..b0ab89deaf2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -58,7 +58,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
"explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes",
"offlineResponseSerMemAllocatedBytes",
"realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes",
"realtimeTotalMemAllocatedBytes",
- "pools", "rlsFiltersApplied", "groupsTrimmed", "materializedViewQueried"
+ "pools", "rlsFiltersApplied", "groupsTrimmed", "materializedViewQueried",
"serverStats"
})
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerResponseNative implements BrokerResponse {
@@ -126,6 +126,9 @@ public class BrokerResponseNative implements BrokerResponse
{
@Nullable
private String _materializedViewQueried;
+ @Nullable
+ private String _serverStats;
+
public BrokerResponseNative() {
}
@@ -649,4 +652,16 @@ public class BrokerResponseNative implements
BrokerResponse {
public String getMaterializedViewQueried() {
return _materializedViewQueried;
}
+
+ @JsonProperty("serverStats")
+ public void setServerStats(@Nullable String serverStats) {
+ _serverStats = serverStats;
+ }
+
+ @Nullable
+ @JsonProperty("serverStats")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getServerStats() {
+ return _serverStats;
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
index e25008636bf..5a5f3f2cc5d 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.common.response.broker;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.testng.Assert;
@@ -85,4 +87,30 @@ public class BrokerResponseNativeTest {
Assert.assertFalse(json.contains("materializedViewQueried"),
"Null materializedViewQueried should be suppressed by
@JsonInclude(NON_NULL)");
}
+
+ @Test
+ public void testServerStatsRoundTrip()
+ throws IOException {
+ BrokerResponseNative expected = new BrokerResponseNative();
+ String stats =
+
"Server=SubmitDelayMs,ResponseDelayMs,ResponseSize,DeserializationTimeMs,RequestSentDelayMs;"
+ + "pinot-server-0_O=0,1,7571,0,0;pinot-server-1_O=0,1,7574,0,0";
+ expected.setServerStats(stats);
+
+ BrokerResponseNative actual =
BrokerResponseNative.fromJsonString(expected.toJsonString());
+ Assert.assertEquals(actual.getServerStats(), stats);
+ }
+
+ @Test
+ public void testServerStatsAbsentWhenNull()
+ throws IOException {
+ BrokerResponseNative response = new BrokerResponseNative();
+ JsonNode tree = new ObjectMapper().readTree(response.toJsonString());
+ Assert.assertFalse(tree.has("serverStats"));
+ }
+
+ @Test
+ public void testServerStatsDefaultsToNull() {
+ Assert.assertNull(new BrokerResponseNative().getServerStats());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]