This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 613274643aa [timeseries] Add support for query event listeners (#17464)
613274643aa is described below

commit 613274643aa6b2097bf7c5544cec188a02fd4efd
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Jan 9 09:04:45 2026 -0800

    [timeseries] Add support for query event listeners (#17464)
    
    * [timeseries] Add support for query event listeners
    
    * Adding exceptions from TimeSeriesBlock to RequestContext
    
    ---------
    
    Co-authored-by: shauryachats <[email protected]>
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 28 ++++++++++++---------
 .../requesthandler/TimeSeriesRequestHandler.java   | 29 +++++++++++++++++++---
 .../response/mapper/TimeSeriesResponseMapper.java  | 27 ++++++++++++++++++++
 .../tsdb/planner/TimeSeriesQueryEnvironment.java   |  2 +-
 .../tsdb/planner/physical/TableScanVisitor.java    |  6 +++--
 5 files changed, 74 insertions(+), 18 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 86f56aa03f2..5b18088ccac 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -150,18 +150,7 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     requestContext.setBrokerId(_brokerId);
     long requestId = _requestIdGenerator.get();
     requestContext.setRequestId(requestId);
-
-    if (httpHeaders != null && !_trackedHeaders.isEmpty()) {
-      MultivaluedMap<String, String> requestHeaders = 
httpHeaders.getRequestHeaders();
-      Map<String, List<String>> trackedHeadersMap = 
Maps.newHashMapWithExpectedSize(_trackedHeaders.size());
-      for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) {
-        String key = entry.getKey().toLowerCase();
-        if (_trackedHeaders.contains(key)) {
-          trackedHeadersMap.put(key, entry.getValue());
-        }
-      }
-      requestContext.setRequestHttpHeaders(trackedHeadersMap);
-    }
+    setTrackedHeadersInRequestContext(requestContext, httpHeaders, 
_trackedHeaders);
 
     // First-stage access control to prevent unauthenticated requests from 
using up resources. Secondary table-level
     // check comes later.
@@ -361,6 +350,21 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     statistics.setTraceInfo(response.getTraceInfo());
   }
 
+  protected static void setTrackedHeadersInRequestContext(RequestContext 
requestContext,
+      HttpHeaders httpHeaders, Set<String> trackedHeaders) {
+    if (httpHeaders != null && !trackedHeaders.isEmpty()) {
+      MultivaluedMap<String, String> requestHeaders = 
httpHeaders.getRequestHeaders();
+      Map<String, List<String>> trackedHeadersMap = 
Maps.newHashMapWithExpectedSize(trackedHeaders.size());
+      for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) {
+        String key = entry.getKey().toLowerCase();
+        if (trackedHeaders.contains(key)) {
+          trackedHeadersMap.put(key, entry.getValue());
+        }
+      }
+      requestContext.setRequestHttpHeaders(trackedHeadersMap);
+    }
+  }
+
   @Override
   public Map<Long, String> getRunningQueries() {
     Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation 
is not enabled on broker");
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 6f1a80b1256..fc6cd452776 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
@@ -50,6 +51,7 @@ import 
org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.mapper.TimeSeriesResponseMapper;
 import org.apache.pinot.common.utils.HumanReadableDuration;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.TargetType;
@@ -126,6 +128,7 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
     _queryDispatcher.shutdown();
   }
 
+  // TODO: Consider returning BrokerResponse instead of TimeSeriesBlock for 
consistency with other handlers.
   @Override
   public TimeSeriesBlock handleTimeSeriesRequest(String lang, String 
rawQueryParamString,
       Map<String, String> queryParams, RequestContext requestContext, 
RequesterIdentity requesterIdentity,
@@ -136,13 +139,17 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES, 1);
       requestContext.setBrokerId(_brokerId);
       requestContext.setRequestId(_requestIdGenerator.get());
-      RangeTimeSeriesRequest timeSeriesRequest = null;
+      setTrackedHeadersInRequestContext(requestContext, httpHeaders, 
_trackedHeaders);
+
       firstStageAccessControlCheck(requesterIdentity);
+      RangeTimeSeriesRequest timeSeriesRequest;
       try {
         timeSeriesRequest = buildRangeTimeSeriesRequest(lang, 
rawQueryParamString, queryParams);
       } catch (URISyntaxException e) {
         throw new QueryException(QueryErrorCode.TIMESERIES_PARSING, "Error 
building RangeTimeSeriesRequest", e);
       }
+      requestContext.setQuery(timeSeriesRequest.getQuery());
+
       TimeSeriesLogicalPlanResult logicalPlanResult = 
_queryEnvironment.buildLogicalPlan(timeSeriesRequest);
       // If there are no buckets in the logical plan, return an empty response.
       if (logicalPlanResult.getTimeBuckets().getNumBuckets() == 0) {
@@ -154,17 +161,33 @@ public class TimeSeriesRequestHandler extends 
BaseBrokerRequestHandler {
 
       timeSeriesBlock = 
_queryDispatcher.submitAndGet(requestContext.getRequestId(), dispatchablePlan,
           timeSeriesRequest.getTimeout().toMillis(), requestContext);
+      TimeSeriesResponseMapper.setStatsInRequestContext(requestContext, 
timeSeriesBlock.getMetadata());
+      setExceptionsFromBlockToRequestContext(timeSeriesBlock, requestContext);
       return timeSeriesBlock;
     } catch (Exception e) {
+      QueryException qe;
       
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.TIME_SERIES_GLOBAL_QUERIES_FAILED,
 1);
       if (e instanceof QueryException) {
-        throw (QueryException) e;
+        qe = (QueryException) e;
       } else {
-        throw new QueryException(QueryErrorCode.UNKNOWN, "Error processing 
time-series query", e);
+        qe = new QueryException(QueryErrorCode.UNKNOWN, "Error processing 
time-series query", e);
       }
+      requestContext.setErrorCode(qe.getErrorCode());
+      throw qe;
     } finally {
       _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, 
System.currentTimeMillis() - queryStartTime,
           TimeUnit.MILLISECONDS);
+      _brokerQueryEventListener.onQueryCompletion(requestContext);
+    }
+  }
+
+  private void setExceptionsFromBlockToRequestContext(TimeSeriesBlock 
timeSeriesBlock, RequestContext requestContext) {
+    List<QueryException> exceptions = timeSeriesBlock.getExceptions();
+    if (exceptions != null && !exceptions.isEmpty()) {
+      // Set the first exception's error code in the request context
+      requestContext.setErrorCode(exceptions.get(0).getErrorCode());
+      
requestContext.setProcessingExceptions(exceptions.stream().map(QueryException::getMessage)
+        .collect(Collectors.toList()));
     }
   }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
index be681bc50b3..ae146ea8273 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/mapper/TimeSeriesResponseMapper.java
@@ -29,6 +29,7 @@ import 
org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.tsdb.spi.series.TimeSeries;
 import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
 
@@ -160,6 +161,32 @@ public class TimeSeriesResponseMapper {
     brokerResponse.addBrokerStats(map);
   }
 
+  public static void setStatsInRequestContext(RequestContext requestContext, 
Map<String, String> metadata) {
+    if (metadata == null || metadata.isEmpty() || requestContext == null) {
+      return;
+    }
+    requestContext.setNumDocsScanned(getLongMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_DOCS_SCANNED));
+    requestContext.setNumEntriesScannedInFilter(getLongMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER));
+    
requestContext.setNumEntriesScannedPostFilter(getLongMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER));
+    requestContext.setTotalDocs(getLongMetadataValue(metadata,
+        DataTable.MetadataKey.TOTAL_DOCS));
+    requestContext.setNumSegmentsQueried(getIntMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_SEGMENTS_QUERIED));
+    requestContext.setNumSegmentsProcessed(getIntMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED));
+    requestContext.setNumSegmentsMatched(getIntMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_SEGMENTS_MATCHED));
+    requestContext.setNumConsumingSegmentsMatched(getIntMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED));
+    
requestContext.setNumConsumingSegmentsProcessed(getIntMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED));
+    requestContext.setNumConsumingSegmentsQueried(getIntMetadataValue(metadata,
+        DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED));
+  }
+
   private static long getLongMetadataValue(Map<String, String> metadata, 
DataTable.MetadataKey key) {
     return Long.parseLong(metadata.getOrDefault(key.getName(), "0"));
   }
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
index 157122eebfd..c62b931765a 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java
@@ -99,7 +99,7 @@ public class TimeSeriesQueryEnvironment {
       RequestContext requestContext, TimeSeriesLogicalPlanResult logicalPlan) {
     // Step-0: Add table type info to the logical plan.
     logicalPlan = new 
TimeSeriesLogicalPlanResult(TableScanVisitor.INSTANCE.addTableTypeInfoToPlan(
-      logicalPlan.getPlanNode()), logicalPlan.getTimeBuckets());
+      logicalPlan.getPlanNode(), requestContext), 
logicalPlan.getTimeBuckets());
     // Step-1: Assign segments to servers for each leaf node.
     TableScanVisitor.Context scanVisitorContext = 
TableScanVisitor.createContext(requestContext.getRequestId());
     TableScanVisitor.INSTANCE.assignSegmentsToPlan(logicalPlan.getPlanNode(), 
logicalPlan.getTimeBuckets(),
diff --git 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
index cfd7332d88f..cc5891de87c 100644
--- 
a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
+++ 
b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.TableRouteInfo;
 import org.apache.pinot.core.routing.TableRouteProvider;
 import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.tsdb.spi.TimeBuckets;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
@@ -86,19 +87,20 @@ public class TableScanVisitor {
    * @param planNode The {@link BaseTimeSeriesPlanNode} to process.
    * @return The updated {@link BaseTimeSeriesPlanNode} with table type 
information.
    */
-  public BaseTimeSeriesPlanNode addTableTypeInfoToPlan(BaseTimeSeriesPlanNode 
planNode) {
+  public BaseTimeSeriesPlanNode addTableTypeInfoToPlan(BaseTimeSeriesPlanNode 
planNode, RequestContext requestContext) {
     if (planNode instanceof LeafTimeSeriesPlanNode) {
       LeafTimeSeriesPlanNode sfpNode = (LeafTimeSeriesPlanNode) planNode;
       TableRouteInfo routeInfo = 
_tableRouteProvider.getTableRouteInfo(sfpNode.getTableName(), _tableCache,
         _routingManager);
       String tableNameWithType = getTableNameWithType(routeInfo);
       Preconditions.checkNotNull(tableNameWithType, "Table not found for table 
name: " + sfpNode.getTableName());
+      requestContext.setTableName(tableNameWithType);
       return sfpNode.withTableName(tableNameWithType);
     }
 
     List<BaseTimeSeriesPlanNode> newInputs = new ArrayList<>();
     for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) {
-      newInputs.add(addTableTypeInfoToPlan(childNode));
+      newInputs.add(addTableTypeInfoToPlan(childNode, requestContext));
     }
     return planNode.withInputs(newInputs);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to