This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a3c1ca3081 Add termination checks to FunnelEventsFunctionEval and 
FunnelStepDurationStats (#17700)
8a3c1ca3081 is described below

commit 8a3c1ca3081ea0f118173a966709df462e5c8fbe
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Feb 17 09:09:13 2026 -0800

    Add termination checks to FunnelEventsFunctionEval and 
FunnelStepDurationStats (#17700)
---
 .../window/FunnelEventsFunctionEvalAggregationFunction.java      | 9 +++++++++
 .../window/FunnelStepDurationStatsAggregationFunction.java       | 4 ++++
 2 files changed, 13 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
index b783194d65e..4ab550d3e07 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
@@ -39,6 +39,7 @@ import 
org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWi
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import 
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.query.QueryThreadContext;
 
 
 public class FunnelEventsFunctionEvalAggregationFunction
@@ -396,11 +397,14 @@ public class FunnelEventsFunctionEvalAggregationFunction
   protected void fillWindow(PriorityQueue<FunnelStepEventWithExtraFields> 
stepEvents,
       ArrayDeque<FunnelStepEventWithExtraFields> slidingWindow) {
     // Ensure for the sliding window, the first event is the first step
+    int numEventsProcessed = 0;
     while ((!slidingWindow.isEmpty()) && 
slidingWindow.peek().getFunnelStepEvent().getStep() != 0) {
       slidingWindow.pollFirst();
     }
     if (slidingWindow.isEmpty()) {
       while (!stepEvents.isEmpty() && 
stepEvents.peek().getFunnelStepEvent().getStep() != 0) {
+        
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+            "FunnelEventsFunctionEvalAggregationFunction#fillWindow");
         stepEvents.poll();
       }
       if (stepEvents.isEmpty()) {
@@ -412,6 +416,8 @@ public class FunnelEventsFunctionEvalAggregationFunction
     long windowStart = 
slidingWindow.peek().getFunnelStepEvent().getTimestamp();
     long windowEnd = windowStart + _windowSize;
     while (!stepEvents.isEmpty() && 
(stepEvents.peek().getFunnelStepEvent().getTimestamp() < windowEnd)) {
+      
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+          "FunnelEventsFunctionEvalAggregationFunction#fillWindow");
       if (_maxStepDuration > 0) {
         // When maxStepDuration > 0, we need to check if the event_to_add has 
a timestamp within the max duration
         // from the last event in the sliding window. If not, we break the 
loop.
@@ -464,7 +470,10 @@ public class FunnelEventsFunctionEvalAggregationFunction
 
       int maxStep = 0;
       long previousTimestamp = -1;
+      int numEventsProcessed = 0;
       for (FunnelStepEventWithExtraFields event : slidingWindow) {
+        
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+            "FunnelEventsFunctionEvalAggregationFunction#extractFinalResult");
         int currentEventStep = event.getFunnelStepEvent().getStep();
         // If the same condition holds for the sequence of events, then such 
repeating event interrupts further
         // processing.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
index ef585e0583e..1097fb26018 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.segment.local.aggregator.PercentileEstValueAggregator;
 import org.apache.pinot.segment.local.customobject.AvgPair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
@@ -218,7 +219,10 @@ public class FunnelStepDurationStatsAggregationFunction 
extends FunnelBaseAggreg
   protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) {
     int maxStep = 0;
     long previousTimestamp = -1;
+    int numEventsProcessed = 0;
     for (FunnelStepEvent event : slidingWindow) {
+      
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+          "FunnelStepDurationStatsAggregationFunction#processWindow");
       int currentEventStep = event.getStep();
       // If the same condition holds for the sequence of events, then such 
repeating event interrupts further
       // processing.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to