This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 36be75f1dd Add more information in RequestContext class (#11708)
36be75f1dd is described below

commit 36be75f1dd9525e844a4a2bbd545ea84b54bd609
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Sun Oct 15 23:25:08 2023 +0530

    Add more information in RequestContext class (#11708)
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  13 ++
 .../MultiStageBrokerRequestHandler.java            |   2 +
 .../SingleConnectionBrokerRequestHandler.java      |   1 +
 .../pinot/spi/trace/DefaultRequestContext.java     | 168 ++++++++++++++++++++-
 .../org/apache/pinot/spi/trace/RequestContext.java |  60 ++++++++
 5 files changed, 240 insertions(+), 4 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 47ffeddedf..49ae0d9d94 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -1747,6 +1747,19 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
     statistics.setOfflineTotalCpuTimeNs(response.getOfflineTotalCpuTimeNs());
     statistics.setRealtimeTotalCpuTimeNs(response.getRealtimeTotalCpuTimeNs());
     statistics.setNumRowsResultSet(response.getNumRowsResultSet());
+    
statistics.setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+    
statistics.setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+    
statistics.setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+    
statistics.setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+    
statistics.setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+    
statistics.setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+    
statistics.setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+    
statistics.setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+    
statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+    
statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+    
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+    
statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect(
+        Collectors.toList()));
   }
 
   private String getGlobalQueryId(long requestId) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 7fb9b24c14..5c4e86a7d4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -156,6 +156,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
     Set<String> tableNames = queryPlanResult.getTableNames();
+    requestContext.setTableNames(List.copyOf(tableNames));
 
     // Compilation Time. This includes the time taken for parsing, compiling, 
create stage plans and assigning workers.
     long compilationEndTimeNs = System.nanoTime();
@@ -235,6 +236,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs - 
compilationStartTimeNs));
     brokerResponse.setTimeUsedMs(totalTimeMs);
     requestContext.setQueryProcessingTime(totalTimeMs);
+    requestContext.setTraceInfo(brokerResponse.getTraceInfo());
     augmentStatistics(requestContext, brokerResponse);
 
     // Log query and stats
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 3ad8648442..0e0c35d318 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -144,6 +144,7 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseBrokerRequestHandl
         _brokerReduceService.reduceOnDataTable(originalBrokerRequest, 
serverBrokerRequest, dataTableMap,
             reduceTimeOutMs, _brokerMetrics);
     final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
+    requestContext.setTraceInfo(brokerResponse.getTraceInfo());
     requestContext.setReduceTimeNanos(reduceTimeNanos);
     _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, 
reduceTimeNanos);
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 60e1a1cca8..6ce063d253 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.spi.trace;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 
@@ -32,7 +36,7 @@ public class DefaultRequestContext implements RequestScope {
 
   private int _errorCode = 0;
   private String _query;
-  private String _tableName = DEFAULT_TABLE_NAME;
+  private List<String> _tableNames = new ArrayList<>();
   private long _processingTimeMillis = -1;
   private long _totalDocs;
   private long _numDocsScanned;
@@ -63,6 +67,19 @@ public class DefaultRequestContext implements RequestScope {
 
   private FanoutType _fanoutType;
   private int _numUnavailableSegments;
+  private long _numConsumingSegmentsQueried;
+  private long _numConsumingSegmentsProcessed;
+  private long _numConsumingSegmentsMatched;
+  private long _minConsumingFreshnessTimeMs;
+  private long _numSegmentsPrunedByBroker;
+  private long _numSegmentsPrunedByServer;
+  private long _numSegmentsPrunedInvalid;
+  private long _numSegmentsPrunedByLimit;
+  private long _numSegmentsPrunedByValue;
+  private long _explainPlanNumEmptyFilterSegments;
+  private long _explainPlanNumMatchAllFilterSegments;
+  private Map<String, String> _traceInfo = new HashMap<>();
+  private List<String> _processingExceptions = new ArrayList<>();
 
   public DefaultRequestContext() {
   }
@@ -169,7 +186,12 @@ public class DefaultRequestContext implements RequestScope 
{
 
   @Override
   public void setTableName(String tableName) {
-    _tableName = tableName;
+    _tableNames.add(tableName);
+  }
+
+  @Override
+  public void setTableNames(List<String> tableNames) {
+    _tableNames.addAll(tableNames);
   }
 
   @Override
@@ -239,7 +261,15 @@ public class DefaultRequestContext implements RequestScope 
{
 
   @Override
   public String getTableName() {
-    return _tableName;
+    if (_tableNames.size() == 0) {
+      return DEFAULT_TABLE_NAME;
+    }
+    return _tableNames.get(0);
+  }
+
+  @Override
+  public List<String> getTableNames() {
+    return _tableNames;
   }
 
   @Override
@@ -314,7 +344,7 @@ public class DefaultRequestContext implements RequestScope {
 
   @Override
   public boolean hasValidTableName() {
-    return !DEFAULT_TABLE_NAME.equals(_tableName);
+    return !_tableNames.isEmpty();
   }
 
   @Override
@@ -402,6 +432,136 @@ public class DefaultRequestContext implements 
RequestScope {
     _reduceTimeMillis = reduceTimeMillis;
   }
 
+  @Override
+  public long getNumConsumingSegmentsQueried() {
+    return _numConsumingSegmentsQueried;
+  }
+
+  @Override
+  public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried) 
{
+    _numConsumingSegmentsQueried = numConsumingSegmentsQueried;
+  }
+
+  @Override
+  public long getNumConsumingSegmentsProcessed() {
+    return _numConsumingSegmentsProcessed;
+  }
+
+  @Override
+  public void setNumConsumingSegmentsProcessed(long 
numConsumingSegmentsProcessed) {
+    _numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
+  }
+
+  @Override
+  public long getNumConsumingSegmentsMatched() {
+    return _numConsumingSegmentsMatched;
+  }
+
+  @Override
+  public void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched) 
{
+    _numConsumingSegmentsMatched = numConsumingSegmentsMatched;
+  }
+
+  @Override
+  public long getMinConsumingFreshnessTimeMs() {
+    return _minConsumingFreshnessTimeMs;
+  }
+
+  @Override
+  public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs) 
{
+    _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByBroker() {
+    return _numSegmentsPrunedByBroker;
+  }
+
+  @Override
+  public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) {
+    _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByServer() {
+    return _numSegmentsPrunedByServer;
+  }
+
+  @Override
+  public void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer) {
+    _numSegmentsPrunedByServer = numSegmentsPrunedByServer;
+  }
+
+  @Override
+  public long getNumSegmentsPrunedInvalid() {
+    return _numSegmentsPrunedInvalid;
+  }
+
+  @Override
+  public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
+    _numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByLimit() {
+    return _numSegmentsPrunedByLimit;
+  }
+
+  @Override
+  public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
+    _numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
+  }
+
+  @Override
+  public long getNumSegmentsPrunedByValue() {
+    return _numSegmentsPrunedByValue;
+  }
+
+  @Override
+  public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
+    _numSegmentsPrunedByValue = numSegmentsPrunedByValue;
+  }
+
+  @Override
+  public long getExplainPlanNumEmptyFilterSegments() {
+    return _explainPlanNumEmptyFilterSegments;
+  }
+
+  @Override
+  public void setExplainPlanNumEmptyFilterSegments(long 
explainPlanNumEmptyFilterSegments) {
+    _explainPlanNumEmptyFilterSegments = explainPlanNumEmptyFilterSegments;
+  }
+
+  @Override
+  public long getExplainPlanNumMatchAllFilterSegments() {
+    return _explainPlanNumMatchAllFilterSegments;
+  }
+
+  @Override
+  public void setExplainPlanNumMatchAllFilterSegments(long 
explainPlanNumMatchAllFilterSegments) {
+    _explainPlanNumMatchAllFilterSegments = 
explainPlanNumMatchAllFilterSegments;
+  }
+
+  @Override
+  public Map<String, String> getTraceInfo() {
+    return _traceInfo;
+  }
+
+  @Override
+  public void setTraceInfo(Map<String, String> traceInfo) {
+    _traceInfo.putAll(traceInfo);
+  }
+
+  @Override
+  public List<String> getProcessingExceptions() {
+    return _processingExceptions;
+  }
+
+  @Override
+  public void setProcessingExceptions(List<String> processingExceptions) {
+    _processingExceptions.addAll(processingExceptions);
+  }
+
   @Override
   public void close() {
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index 89f768608b..f8ec35a921 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.spi.trace;
 
+import java.util.List;
+import java.util.Map;
+
+
 public interface RequestContext {
   long getOfflineSystemActivitiesCpuTimeNs();
 
@@ -65,6 +69,8 @@ public interface RequestContext {
 
   void setTableName(String tableName);
 
+  void setTableNames(List<String> tableNames);
+
   void setQueryProcessingTime(long processingTimeMillis);
 
   void setBrokerId(String brokerId);
@@ -93,6 +99,8 @@ public interface RequestContext {
 
   String getTableName();
 
+  List<String> getTableNames();
+
   long getProcessingTimeMillis();
 
   long getTotalDocs();
@@ -157,6 +165,58 @@ public interface RequestContext {
 
   void setReduceTimeMillis(long reduceTimeMillis);
 
+  long getNumConsumingSegmentsQueried();
+
+  void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried);
+
+  long getNumConsumingSegmentsProcessed();
+
+  void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed);
+
+  long getNumConsumingSegmentsMatched();
+
+  void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched);
+
+  long getMinConsumingFreshnessTimeMs();
+
+  void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs);
+
+  long getNumSegmentsPrunedByBroker();
+
+  void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker);
+
+  long getNumSegmentsPrunedByServer();
+
+  void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);
+
+  long getNumSegmentsPrunedInvalid();
+
+  void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);
+
+  long getNumSegmentsPrunedByLimit();
+
+  void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);
+
+  long getNumSegmentsPrunedByValue();
+
+  void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);
+
+  long getExplainPlanNumEmptyFilterSegments();
+
+  void setExplainPlanNumEmptyFilterSegments(long 
explainPlanNumEmptyFilterSegments);
+
+  long getExplainPlanNumMatchAllFilterSegments();
+
+  void setExplainPlanNumMatchAllFilterSegments(long 
explainPlanNumMatchAllFilterSegments);
+
+  Map<String, String> getTraceInfo();
+
+  void setTraceInfo(Map<String, String> traceInfo);
+
+  List<String> getProcessingExceptions();
+
+  void setProcessingExceptions(List<String> processingExceptions);
+
   enum FanoutType {
     OFFLINE, REALTIME, HYBRID
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to