This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 159f046dd1a Fix flaky MergeRollupMinionClusterIntegrationTest gauge
assertions (#18260)
159f046dd1a is described below
commit 159f046dd1a3de7b2a78bc544d4152771d663971
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Apr 20 14:18:50 2026 -0700
Fix flaky MergeRollupMinionClusterIntegrationTest gauge assertions (#18260)
Extends the polling pattern introduced in #18253 (for
mergeRollupTaskNumBucketsToProcess) to the remaining five
mergeRollupTaskDelayInNumBuckets.* gaugeExists checks in the same test
class.
The gauge is registered by
MergeRollupTaskGenerator.createOrUpdateDelayMetrics
and removed by resetDelayMetrics when a scheduleTasks call observes no
eligible segments. The per-iteration body's
assertNull(scheduleTasks(context).get(RealtimeToOfflineSegmentsTask))
probe triggers an extra synchronized scheduleTasks that can race with
the previous merge task's segment-lineage commit, transiently resetting
the gauge and causing the post-loop assertTrue(gaugeExists(...)) to flake
on the same window that #18253 addressed.
A new waitForGaugesToExist(String...) helper polls via
TestUtils.waitForCondition with the existing TIMEOUT_IN_MS, and is used
in testOfflineTableSingleLevelConcat,
testOfflineTableSingleLevelConcatWithMetadataPush,
testOfflineTableSingleLevelRollup, testOfflineTableMultiLevelConcat (both
45days + 90days atomically), and testRealtimeTableSingleLevelConcat.
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../MergeRollupMinionClusterIntegrationTest.java | 43 ++++++++++++++++------
1 file changed, 31 insertions(+), 12 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index c22929be876..5709a72b3c5 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -464,8 +464,7 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Check total tasks
assertEquals(numTasks, 5);
-
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
- "mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days"));
+
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days");
// Drop the table
dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE);
@@ -582,8 +581,7 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Check total tasks
assertEquals(numTasks, 5);
-
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
- "mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"));
+
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days");
// Drop the table
dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
@@ -706,8 +704,7 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Check total tasks
assertEquals(numTasks, 3);
-
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
- "mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days"));
+
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days");
}
/**
@@ -853,10 +850,8 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Check total tasks
assertEquals(numTasks, 8);
-
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
- "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days"));
-
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
- "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days"));
+
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days",
+ "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days");
}
protected void verifyTableDelete(String tableNameWithType) {
@@ -911,6 +906,31 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
}, TIMEOUT_IN_MS, "Timeout while waiting for expected num buckets to
process metrics on " + tableNameWithType);
}
+ /**
+ * Poll until all of the named gauges exist on the controller. Used here for
+ * {@code mergeRollupTaskDelayInNumBuckets.*} after each test's scheduling
loop completes.
+ *
+ * <p>Those gauges are (re)registered by {@link
PinotTaskManager#scheduleTasks} via
+ * {@code MergeRollupTaskGenerator.createOrUpdateDelayMetrics}. They are
removed by
+ * {@code resetDelayMetrics} when a {@code scheduleTasks} call observes no
eligible segments for the
+ * table — which can happen transiently if a {@code scheduleTasks} call
(e.g. the per-iteration
+ * {@code RealtimeToOfflineSegmentsTask} probe inside the for-loop body)
lands while the previous
+ * merge task's segment-lineage commit is still in flight. Polling here
mirrors
+ * {@link #waitForExpectedNumBucketsToProcess} so the post-loop assertion
does not flake on the same
+ * race window.
+ */
+ private void waitForGaugesToExist(String... metricNames) {
+ TestUtils.waitForCondition(aVoid -> {
+ ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
+ for (String metricName : metricNames) {
+ if (!MetricValueUtils.gaugeExists(controllerMetrics, metricName)) {
+ return false;
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Timeout while waiting for gauges to exist: " +
String.join(", ", metricNames));
+ }
+
// The use case is similar as the one defined in offline table
@Test
public void testRealtimeTableSingleLevelConcat()
@@ -1010,8 +1030,7 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
// Check total tasks
assertEquals(numTasks, 5);
-
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
- "mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"));
+
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days");
// Drop the table
dropRealtimeTable(tableName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]