This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 159f046dd1a Fix flaky MergeRollupMinionClusterIntegrationTest gauge 
assertions (#18260)
159f046dd1a is described below

commit 159f046dd1a3de7b2a78bc544d4152771d663971
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Apr 20 14:18:50 2026 -0700

    Fix flaky MergeRollupMinionClusterIntegrationTest gauge assertions (#18260)
    
    Extends the polling pattern introduced in #18253 (for
    mergeRollupTaskNumBucketsToProcess) to the remaining five
    mergeRollupTaskDelayInNumBuckets.* gaugeExists checks in the same test
    class.
    
    The gauge is registered by 
MergeRollupTaskGenerator.createOrUpdateDelayMetrics
    and removed by resetDelayMetrics when a scheduleTasks call observes no
    eligible segments. The per-iteration body's
      assertNull(scheduleTasks(context).get(RealtimeToOfflineSegmentsTask))
    probe triggers an extra synchronized scheduleTasks that can race with
    the previous merge task's segment-lineage commit, transiently resetting
    the gauge and causing the post-loop assertTrue(gaugeExists(...)) to flake
    on the same window that #18253 addressed.
    
    A new waitForGaugesToExist(String...) helper polls via
    TestUtils.waitForCondition with the existing TIMEOUT_IN_MS, and is used
    in testOfflineTableSingleLevelConcat,
    testOfflineTableSingleLevelConcatWithMetadataPush,
    testOfflineTableSingleLevelRollup, testOfflineTableMultiLevelConcat (both
    45days + 90days atomically), and testRealtimeTableSingleLevelConcat.
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../MergeRollupMinionClusterIntegrationTest.java   | 43 ++++++++++++++++------
 1 file changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index c22929be876..5709a72b3c5 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -464,8 +464,7 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     // Check total tasks
     assertEquals(numTasks, 5);
 
-    
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
-        "mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days"));
+    
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days");
     // Drop the table
     dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE);
 
@@ -582,8 +581,7 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     // Check total tasks
     assertEquals(numTasks, 5);
 
-    
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
-        "mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"));
+    
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days");
 
     // Drop the table
     dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
@@ -706,8 +704,7 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     // Check total tasks
     assertEquals(numTasks, 3);
 
-    
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
-        "mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days"));
+    
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days");
   }
 
   /**
@@ -853,10 +850,8 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     // Check total tasks
     assertEquals(numTasks, 8);
 
-    
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
-        "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days"));
-    
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
-        "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days"));
+    
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days",
+        "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days");
   }
 
   protected void verifyTableDelete(String tableNameWithType) {
@@ -911,6 +906,31 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     }, TIMEOUT_IN_MS, "Timeout while waiting for expected num buckets to 
process metrics on " + tableNameWithType);
   }
 
+  /**
+   * Poll until all of the named gauges exist on the controller. Used here for
+   * {@code mergeRollupTaskDelayInNumBuckets.*} after each test's scheduling 
loop completes.
+   *
+   * <p>Those gauges are (re)registered by {@link 
PinotTaskManager#scheduleTasks} via
+   * {@code MergeRollupTaskGenerator.createOrUpdateDelayMetrics}. They are 
removed by
+   * {@code resetDelayMetrics} when a {@code scheduleTasks} call observes no 
eligible segments for the
+   * table — which can happen transiently if a {@code scheduleTasks} call 
(e.g. the per-iteration
+   * {@code RealtimeToOfflineSegmentsTask} probe inside the for-loop body) 
lands while the previous
+   * merge task's segment-lineage commit is still in flight. Polling here 
mirrors
+   * {@link #waitForExpectedNumBucketsToProcess} so the post-loop assertion 
does not flake on the same
+   * race window.
+   */
+  private void waitForGaugesToExist(String... metricNames) {
+    TestUtils.waitForCondition(aVoid -> {
+      ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
+      for (String metricName : metricNames) {
+        if (!MetricValueUtils.gaugeExists(controllerMetrics, metricName)) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Timeout while waiting for gauges to exist: " + 
String.join(", ", metricNames));
+  }
+
   // The use case is similar as the one defined in offline table
   @Test
   public void testRealtimeTableSingleLevelConcat()
@@ -1010,8 +1030,7 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     // Check total tasks
     assertEquals(numTasks, 5);
 
-    
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
-        "mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"));
+    
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days");
 
     // Drop the table
     dropRealtimeTable(tableName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to