This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c418442c49 Fix flakiness of ControllerPeriodicTasksIntegrationTest 
(#13337)
c418442c49 is described below

commit c418442c498865c35b5290ecc41f38743b3f0bcd
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Jun 7 12:52:42 2024 -0700

    Fix flakiness of ControllerPeriodicTasksIntegrationTest (#13337)
---
 .../ControllerPeriodicTasksIntegrationTest.java    | 122 +++++++++++----------
 1 file changed, 63 insertions(+), 59 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 03a2b6a000..9e58028146 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -62,10 +63,10 @@ import static org.testng.Assert.assertTrue;
  * The intention of these tests is not to test functionality of daemons, but 
simply to check that they run as expected
  * and process the tables when the controller starts.
  */
+// TODO: Add tests for other ControllerPeriodicTasks (RetentionManager, 
RealtimeSegmentValidationManager).
 public class ControllerPeriodicTasksIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30;
-  private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5;
-  private static final String PERIODIC_TASK_FREQUENCY = "5s";
+  private static final String PERIODIC_TASK_FREQUENCY_PERIOD = "5s";
   private static final String PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD = "5s";
 
   private static final int NUM_REPLICAS = 2;
@@ -115,22 +116,20 @@ public class ControllerPeriodicTasksIntegrationTest 
extends BaseClusterIntegrati
 
     Map<String, Object> properties = getDefaultControllerConfiguration();
     properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
-    properties
-        
.put(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS, 
PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-    
properties.put(ControllerPeriodicTasksConf.DEPRECATED_STATUS_CHECKER_FREQUENCY_IN_SECONDS,
-        PERIODIC_TASK_FREQUENCY_SECONDS);
-    
properties.put(ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS,
+    
properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
         PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-    properties
-        
.put(ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY,
 PERIODIC_TASK_FREQUENCY);
+    
properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD, 
PERIODIC_TASK_FREQUENCY_PERIOD);
+    
properties.put(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS,
+        PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+    
properties.put(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_FREQUENCY_PERIOD, 
PERIODIC_TASK_FREQUENCY_PERIOD);
     
properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
         PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-    
properties.put(ControllerPeriodicTasksConf.DEPRECATED_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
-        PERIODIC_TASK_FREQUENCY_SECONDS);
+    
properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD,
+        PERIODIC_TASK_FREQUENCY_PERIOD);
     
properties.put(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS,
         PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-    
properties.put(ControllerPeriodicTasksConf.DEPRECATED_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
-        PERIODIC_TASK_FREQUENCY_SECONDS);
+    
properties.put(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_PERIOD,
+        PERIODIC_TASK_FREQUENCY_PERIOD);
     
properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD,
         PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD);
 
@@ -160,8 +159,8 @@ public class ControllerPeriodicTasksIntegrationTest extends 
BaseClusterIntegrati
     addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
 
     // Create and upload segments
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 
0, _segmentDir, _tarDir);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 
offlineTableConfig, schema, 0, _segmentDir,
+        _tarDir);
     uploadSegments(getTableName(), _tarDir);
 
     // Push data into Kafka
@@ -228,44 +227,40 @@ public class ControllerPeriodicTasksIntegrationTest 
extends BaseClusterIntegrati
     _currentTable = DEFAULT_TABLE_NAME;
 
     int numTables = 6;
-    ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
     TestUtils.waitForCondition(aVoid -> {
-      if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
"SegmentStatusChecker",
-          ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED) != numTables) {
+      if 
(!checkGlobalGaugeValue(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
"SegmentStatusChecker",
+          numTables)) {
         return false;
       }
-      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, 
TableNameBuilder.OFFLINE.tableNameWithType(emptyTable),
-          null, NUM_REPLICAS, 100, 0, 100)) {
+      if 
(!checkSegmentStatusCheckerMetrics(TableNameBuilder.OFFLINE.tableNameWithType(emptyTable),
 null, NUM_REPLICAS,
+          100, 0, 100)) {
         return false;
       }
-      if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
-          TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, 0, 
0, 0, 0)) {
+      if 
(!checkSegmentStatusCheckerMetrics(TableNameBuilder.OFFLINE.tableNameWithType(disabledTable),
 null, 0, 0, 0,
+          0)) {
         return false;
       }
       String tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
       IdealState idealState = 
_helixResourceManager.getTableIdealState(tableNameWithType);
-      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, 
tableNameWithType, idealState, NUM_REPLICAS, 100, 0,
-          100)) {
+      if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState, 
NUM_REPLICAS, 100, 0, 100)) {
         return false;
       }
       tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment);
       idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
       //noinspection PointlessArithmeticExpression
-      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, 
tableNameWithType, idealState, NUM_REPLICAS - 1,
+      if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState, 
NUM_REPLICAS - 1,
           100 * (NUM_REPLICAS - 1) / NUM_REPLICAS, 0, 100)) {
         return false;
       }
       tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
       idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
-      if (!checkSegmentStatusCheckerMetrics(controllerMetrics, 
tableNameWithType, idealState, NUM_REPLICAS, 100, 0,
-          100)) {
+      if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState, 
NUM_REPLICAS, 100, 0, 100)) {
         return false;
       }
-      return MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
ControllerGauge.OFFLINE_TABLE_COUNT) == 4
-          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
ControllerGauge.REALTIME_TABLE_COUNT) == 2
-          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
ControllerGauge.DISABLED_TABLE_COUNT) == 1
-          && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
ControllerGauge.UPSERT_TABLE_COUNT) == 1;
-    }, 60_000, "Timed out waiting for SegmentStatusChecker");
+      return checkGlobalGaugeValue(ControllerGauge.OFFLINE_TABLE_COUNT, 4) && 
checkGlobalGaugeValue(
+          ControllerGauge.REALTIME_TABLE_COUNT, 2) && 
checkGlobalGaugeValue(ControllerGauge.DISABLED_TABLE_COUNT, 1)
+          && checkGlobalGaugeValue(ControllerGauge.UPSERT_TABLE_COUNT, 1);
+    }, 600_000, "Timed out waiting for SegmentStatusChecker");
 
     dropOfflineTable(emptyTable);
     dropOfflineTable(disabledTable);
@@ -288,31 +283,40 @@ public class ControllerPeriodicTasksIntegrationTest 
extends BaseClusterIntegrati
     addTableConfig(tableConfig);
   }
 
-  private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics 
controllerMetrics, String tableNameWithType,
-      IdealState idealState, long expectedNumReplicas, long 
expectedPercentReplicas, long expectedSegmentsInErrorState,
+  private boolean checkGlobalGaugeValue(ControllerGauge gauge, long 
expectedValue) {
+    return MetricValueUtils.getGlobalGaugeValue(ControllerMetrics.get(), 
gauge) == expectedValue;
+  }
+
+  private boolean checkGlobalGaugeValue(ControllerGauge gauge, String key, 
long expectedValue) {
+    return MetricValueUtils.getGlobalGaugeValue(ControllerMetrics.get(), key, 
gauge) == expectedValue;
+  }
+
+  private boolean checkTableGaugeValue(ControllerGauge gauge, String 
tableNameWithType, long expectedValue) {
+    return MetricValueUtils.getTableGaugeValue(ControllerMetrics.get(), 
tableNameWithType, gauge) == expectedValue;
+  }
+
+  private boolean checkSegmentStatusCheckerMetrics(String tableNameWithType, 
@Nullable IdealState idealState,
+      long expectedNumReplicas, long expectedPercentReplicas, long 
expectedSegmentsInErrorState,
       long expectedPercentSegmentsAvailable) {
     if (idealState != null) {
-      if (MetricValueUtils.getTableGaugeValue(controllerMetrics, 
tableNameWithType,
-          ControllerGauge.IDEALSTATE_ZNODE_SIZE) != 
idealState.toString().length()) {
+      if (!checkTableGaugeValue(ControllerGauge.IDEALSTATE_ZNODE_SIZE, 
tableNameWithType,
+          idealState.toString().length())) {
         return false;
       }
-      if (MetricValueUtils.getTableGaugeValue(controllerMetrics, 
tableNameWithType, ControllerGauge.SEGMENT_COUNT)
-          != idealState.getPartitionSet().size()) {
+      if (!checkTableGaugeValue(ControllerGauge.SEGMENT_COUNT, 
tableNameWithType,
+          idealState.getPartitionSet().size())) {
         return false;
       }
     }
-    return MetricValueUtils.getTableGaugeValue(controllerMetrics, 
tableNameWithType,
-        ControllerGauge.NUMBER_OF_REPLICAS) == expectedNumReplicas
-        && MetricValueUtils.getTableGaugeValue(controllerMetrics, 
tableNameWithType,
-        ControllerGauge.PERCENT_OF_REPLICAS) == expectedPercentReplicas
-        && MetricValueUtils.getTableGaugeValue(controllerMetrics, 
tableNameWithType,
-        ControllerGauge.SEGMENTS_IN_ERROR_STATE) == 
expectedSegmentsInErrorState
-        && MetricValueUtils.getTableGaugeValue(controllerMetrics, 
tableNameWithType,
-        ControllerGauge.PERCENT_SEGMENTS_AVAILABLE) == 
expectedPercentSegmentsAvailable;
+    return checkTableGaugeValue(ControllerGauge.NUMBER_OF_REPLICAS, 
tableNameWithType, expectedNumReplicas)
+        && checkTableGaugeValue(ControllerGauge.PERCENT_OF_REPLICAS, 
tableNameWithType, expectedPercentReplicas)
+        && checkTableGaugeValue(ControllerGauge.SEGMENTS_IN_ERROR_STATE, 
tableNameWithType,
+        expectedSegmentsInErrorState) && 
checkTableGaugeValue(ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+        tableNameWithType, expectedPercentSegmentsAvailable);
   }
 
   @Test
-  public void testRealtimeSegmentRelocator()
+  public void testSegmentRelocator()
       throws Exception {
     // Add relocation tenant config
     TableConfig realtimeTableConfig = getRealtimeTableConfig();
@@ -339,7 +343,7 @@ public class ControllerPeriodicTasksIntegrationTest extends 
BaseClusterIntegrati
         }
       }
       return Collections.disjoint(consumingServers, completedServers);
-    }, 60_000, "Timed out waiting for RealtimeSegmentRelocation");
+    }, 600_000, "Timed out waiting for SegmentRelocator");
   }
 
   @Test
@@ -358,7 +362,7 @@ public class ControllerPeriodicTasksIntegrationTest extends 
BaseClusterIntegrati
       IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, 
helixClusterName);
       assertNotNull(idealState);
       return 
idealState.getInstanceSet(tableNameWithType).equals(brokersAfterAdd);
-    }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+    }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
 
     // Drop the new added broker
     _helixAdmin.dropInstance(helixClusterName, instanceConfig);
@@ -369,7 +373,7 @@ public class ControllerPeriodicTasksIntegrationTest extends 
BaseClusterIntegrati
       IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, 
helixClusterName);
       assertNotNull(idealState);
       return 
idealState.getInstanceSet(tableNameWithType).equals(brokersAfterDrop);
-    }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+    }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
   }
 
   @Test
@@ -380,15 +384,15 @@ public class ControllerPeriodicTasksIntegrationTest 
extends BaseClusterIntegrati
 
     // Wait until OfflineSegmentIntervalChecker gets executed
     TestUtils.waitForCondition(aVoid -> {
-      long numSegments =
-          
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
 "SegmentCount"));
-      long numMissingSegments =
-          
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
 "missingSegmentCount"));
-      long numTotalDocs =
-          
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
 "TotalDocumentCount"));
-      return numSegments == NUM_OFFLINE_AVRO_FILES && numMissingSegments == 0 
&& numTotalDocs == 79003;
-    }, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
+      return checkValidationGaugeValue(validationMetrics, tableNameWithType, 
"SegmentCount", NUM_OFFLINE_AVRO_FILES)
+          && checkValidationGaugeValue(validationMetrics, tableNameWithType, 
"missingSegmentCount", 0)
+          && checkValidationGaugeValue(validationMetrics, tableNameWithType, 
"TotalDocumentCount", 79003);
+    }, 600_000, "Timed out waiting for OfflineSegmentIntervalChecker");
   }
 
-  // TODO: tests for other ControllerPeriodicTasks (RetentionManager, 
RealtimeSegmentValidationManager)
+  private boolean checkValidationGaugeValue(ValidationMetrics 
validationMetrics, String tableNameWithType,
+      String gaugeName, long expectedValue) {
+    return 
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
 gaugeName))
+        == expectedValue;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to