This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8a3c1ca3081 Add termination checks to FunnelEventsFunctionEval and
FunnelStepDurationStats (#17700)
8a3c1ca3081 is described below
commit 8a3c1ca3081ea0f118173a966709df462e5c8fbe
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Feb 17 09:09:13 2026 -0800
Add termination checks to FunnelEventsFunctionEval and
FunnelStepDurationStats (#17700)
---
.../window/FunnelEventsFunctionEvalAggregationFunction.java | 9 +++++++++
.../window/FunnelStepDurationStatsAggregationFunction.java | 4 ++++
2 files changed, 13 insertions(+)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
index b783194d65e..4ab550d3e07 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
@@ -39,6 +39,7 @@ import
org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWi
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.query.QueryThreadContext;
public class FunnelEventsFunctionEvalAggregationFunction
@@ -396,11 +397,14 @@ public class FunnelEventsFunctionEvalAggregationFunction
protected void fillWindow(PriorityQueue<FunnelStepEventWithExtraFields>
stepEvents,
ArrayDeque<FunnelStepEventWithExtraFields> slidingWindow) {
// Ensure for the sliding window, the first event is the first step
+ int numEventsProcessed = 0;
while ((!slidingWindow.isEmpty()) &&
slidingWindow.peek().getFunnelStepEvent().getStep() != 0) {
slidingWindow.pollFirst();
}
if (slidingWindow.isEmpty()) {
while (!stepEvents.isEmpty() &&
stepEvents.peek().getFunnelStepEvent().getStep() != 0) {
+
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+ "FunnelEventsFunctionEvalAggregationFunction#fillWindow");
stepEvents.poll();
}
if (stepEvents.isEmpty()) {
@@ -412,6 +416,8 @@ public class FunnelEventsFunctionEvalAggregationFunction
long windowStart =
slidingWindow.peek().getFunnelStepEvent().getTimestamp();
long windowEnd = windowStart + _windowSize;
while (!stepEvents.isEmpty() &&
(stepEvents.peek().getFunnelStepEvent().getTimestamp() < windowEnd)) {
+
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+ "FunnelEventsFunctionEvalAggregationFunction#fillWindow");
if (_maxStepDuration > 0) {
// When maxStepDuration > 0, we need to check if the event_to_add has
a timestamp within the max duration
// from the last event in the sliding window. If not, we break the
loop.
@@ -464,7 +470,10 @@ public class FunnelEventsFunctionEvalAggregationFunction
int maxStep = 0;
long previousTimestamp = -1;
+ int numEventsProcessed = 0;
for (FunnelStepEventWithExtraFields event : slidingWindow) {
+
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+ "FunnelEventsFunctionEvalAggregationFunction#extractFinalResult");
int currentEventStep = event.getFunnelStepEvent().getStep();
// If the same condition holds for the sequence of events, then such
repeating event interrupts further
// processing.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
index ef585e0583e..1097fb26018 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.segment.local.aggregator.PercentileEstValueAggregator;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -218,7 +219,10 @@ public class FunnelStepDurationStatsAggregationFunction
extends FunnelBaseAggreg
protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) {
int maxStep = 0;
long previousTimestamp = -1;
+ int numEventsProcessed = 0;
for (FunnelStepEvent event : slidingWindow) {
+
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numEventsProcessed++,
+ "FunnelStepDurationStatsAggregationFunction#processWindow");
int currentEventStep = event.getStep();
// If the same condition holds for the sequence of events, then such
repeating event interrupts further
// processing.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]