This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c418442c49 Fix flakiness of ControllerPeriodicTasksIntegrationTest (#13337) c418442c49 is described below commit c418442c498865c35b5290ecc41f38743b3f0bcd Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jun 7 12:52:42 2024 -0700 Fix flakiness of ControllerPeriodicTasksIntegrationTest (#13337) --- .../ControllerPeriodicTasksIntegrationTest.java | 122 +++++++++++---------- 1 file changed, 63 insertions(+), 59 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 03a2b6a000..9e58028146 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -62,10 +63,10 @@ import static org.testng.Assert.assertTrue; * The intention of these tests is not to test functionality of daemons, but simply to check that they run as expected * and process the tables when the controller starts. */ +// TODO: Add tests for other ControllerPeriodicTasks (RetentionManager, RealtimeSegmentValidationManager). public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrationTestSet { private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30; - private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5; - private static final String PERIODIC_TASK_FREQUENCY = "5s"; + private static final String PERIODIC_TASK_FREQUENCY_PERIOD = "5s"; private static final String PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD = "5s"; private static final int NUM_REPLICAS = 2; @@ -115,22 +116,20 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati Map<String, Object> properties = getDefaultControllerConfiguration(); properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false); - properties - .put(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS, PERIODIC_TASK_INITIAL_DELAY_SECONDS); - properties.put(ControllerPeriodicTasksConf.DEPRECATED_STATUS_CHECKER_FREQUENCY_IN_SECONDS, - PERIODIC_TASK_FREQUENCY_SECONDS); - properties.put(ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS, + properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS, PERIODIC_TASK_INITIAL_DELAY_SECONDS); - properties - .put(ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY, PERIODIC_TASK_FREQUENCY); + properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD, PERIODIC_TASK_FREQUENCY_PERIOD); + properties.put(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS, + PERIODIC_TASK_INITIAL_DELAY_SECONDS); + properties.put(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_FREQUENCY_PERIOD, PERIODIC_TASK_FREQUENCY_PERIOD); properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS, PERIODIC_TASK_INITIAL_DELAY_SECONDS); - properties.put(ControllerPeriodicTasksConf.DEPRECATED_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS, - PERIODIC_TASK_FREQUENCY_SECONDS); + properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD, + PERIODIC_TASK_FREQUENCY_PERIOD); properties.put(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS, PERIODIC_TASK_INITIAL_DELAY_SECONDS); - properties.put(ControllerPeriodicTasksConf.DEPRECATED_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS, - PERIODIC_TASK_FREQUENCY_SECONDS); + properties.put(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_PERIOD, + PERIODIC_TASK_FREQUENCY_PERIOD); properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD, PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD); @@ -160,8 +159,8 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0))); // Create and upload segments - ClusterIntegrationTestUtils - .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, _tarDir); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, + _tarDir); uploadSegments(getTableName(), _tarDir); // Push data into Kafka @@ -228,44 +227,40 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati _currentTable = DEFAULT_TABLE_NAME; int numTables = 6; - ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics(); TestUtils.waitForCondition(aVoid -> { - if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics, "SegmentStatusChecker", - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED) != numTables) { + if (!checkGlobalGaugeValue(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker", + numTables)) { return false; } - if (!checkSegmentStatusCheckerMetrics(controllerMetrics, TableNameBuilder.OFFLINE.tableNameWithType(emptyTable), - null, NUM_REPLICAS, 100, 0, 100)) { + if (!checkSegmentStatusCheckerMetrics(TableNameBuilder.OFFLINE.tableNameWithType(emptyTable), null, NUM_REPLICAS, + 100, 0, 100)) { return false; } - if (!checkSegmentStatusCheckerMetrics(controllerMetrics, - TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, 0, 0, 0, 0)) { + if (!checkSegmentStatusCheckerMetrics(TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, 0, 0, 0, + 0)) { return false; } String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(getTableName()); IdealState idealState = _helixResourceManager.getTableIdealState(tableNameWithType); - if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, NUM_REPLICAS, 100, 0, - 100)) { + if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState, NUM_REPLICAS, 100, 0, 100)) { return false; } tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment); idealState = _helixResourceManager.getTableIdealState(tableNameWithType); //noinspection PointlessArithmeticExpression - if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, NUM_REPLICAS - 1, + if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState, NUM_REPLICAS - 1, 100 * (NUM_REPLICAS - 1) / NUM_REPLICAS, 0, 100)) { return false; } tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); idealState = _helixResourceManager.getTableIdealState(tableNameWithType); - if (!checkSegmentStatusCheckerMetrics(controllerMetrics, tableNameWithType, idealState, NUM_REPLICAS, 100, 0, - 100)) { + if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState, NUM_REPLICAS, 100, 0, 100)) { return false; } - return MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.OFFLINE_TABLE_COUNT) == 4 - && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 2 - && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1 - && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.UPSERT_TABLE_COUNT) == 1; - }, 60_000, "Timed out waiting for SegmentStatusChecker"); + return checkGlobalGaugeValue(ControllerGauge.OFFLINE_TABLE_COUNT, 4) && checkGlobalGaugeValue( + ControllerGauge.REALTIME_TABLE_COUNT, 2) && checkGlobalGaugeValue(ControllerGauge.DISABLED_TABLE_COUNT, 1) + && checkGlobalGaugeValue(ControllerGauge.UPSERT_TABLE_COUNT, 1); + }, 600_000, "Timed out waiting for SegmentStatusChecker"); dropOfflineTable(emptyTable); dropOfflineTable(disabledTable); @@ -288,31 +283,40 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati addTableConfig(tableConfig); } - private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics controllerMetrics, String tableNameWithType, - IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState, + private boolean checkGlobalGaugeValue(ControllerGauge gauge, long expectedValue) { + return MetricValueUtils.getGlobalGaugeValue(ControllerMetrics.get(), gauge) == expectedValue; + } + + private boolean checkGlobalGaugeValue(ControllerGauge gauge, String key, long expectedValue) { + return MetricValueUtils.getGlobalGaugeValue(ControllerMetrics.get(), key, gauge) == expectedValue; + } + + private boolean checkTableGaugeValue(ControllerGauge gauge, String tableNameWithType, long expectedValue) { + return MetricValueUtils.getTableGaugeValue(ControllerMetrics.get(), tableNameWithType, gauge) == expectedValue; + } + + private boolean checkSegmentStatusCheckerMetrics(String tableNameWithType, @Nullable IdealState idealState, + long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState, long expectedPercentSegmentsAvailable) { if (idealState != null) { - if (MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, - ControllerGauge.IDEALSTATE_ZNODE_SIZE) != idealState.toString().length()) { + if (!checkTableGaugeValue(ControllerGauge.IDEALSTATE_ZNODE_SIZE, tableNameWithType, + idealState.toString().length())) { return false; } - if (MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, ControllerGauge.SEGMENT_COUNT) - != idealState.getPartitionSet().size()) { + if (!checkTableGaugeValue(ControllerGauge.SEGMENT_COUNT, tableNameWithType, + idealState.getPartitionSet().size())) { return false; } } - return MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, - ControllerGauge.NUMBER_OF_REPLICAS) == expectedNumReplicas - && MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, - ControllerGauge.PERCENT_OF_REPLICAS) == expectedPercentReplicas - && MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, - ControllerGauge.SEGMENTS_IN_ERROR_STATE) == expectedSegmentsInErrorState - && MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE) == expectedPercentSegmentsAvailable; + return checkTableGaugeValue(ControllerGauge.NUMBER_OF_REPLICAS, tableNameWithType, expectedNumReplicas) + && checkTableGaugeValue(ControllerGauge.PERCENT_OF_REPLICAS, tableNameWithType, expectedPercentReplicas) + && checkTableGaugeValue(ControllerGauge.SEGMENTS_IN_ERROR_STATE, tableNameWithType, + expectedSegmentsInErrorState) && checkTableGaugeValue(ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, + tableNameWithType, expectedPercentSegmentsAvailable); } @Test - public void testRealtimeSegmentRelocator() + public void testSegmentRelocator() throws Exception { // Add relocation tenant config TableConfig realtimeTableConfig = getRealtimeTableConfig(); @@ -339,7 +343,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati } } return Collections.disjoint(consumingServers, completedServers); - }, 60_000, "Timed out waiting for RealtimeSegmentRelocation"); + }, 600_000, "Timed out waiting for SegmentRelocator"); } @Test @@ -358,7 +362,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, helixClusterName); assertNotNull(idealState); return idealState.getInstanceSet(tableNameWithType).equals(brokersAfterAdd); - }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager"); + }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager"); // Drop the new added broker _helixAdmin.dropInstance(helixClusterName, instanceConfig); @@ -369,7 +373,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin, helixClusterName); assertNotNull(idealState); return idealState.getInstanceSet(tableNameWithType).equals(brokersAfterDrop); - }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager"); + }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager"); } @Test @@ -380,15 +384,15 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati // Wait until OfflineSegmentIntervalChecker gets executed TestUtils.waitForCondition(aVoid -> { - long numSegments = - validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "SegmentCount")); - long numMissingSegments = - validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "missingSegmentCount")); - long numTotalDocs = - validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, "TotalDocumentCount")); - return numSegments == NUM_OFFLINE_AVRO_FILES && numMissingSegments == 0 && numTotalDocs == 79003; - }, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker"); + return checkValidationGaugeValue(validationMetrics, tableNameWithType, "SegmentCount", NUM_OFFLINE_AVRO_FILES) + && checkValidationGaugeValue(validationMetrics, tableNameWithType, "missingSegmentCount", 0) + && checkValidationGaugeValue(validationMetrics, tableNameWithType, "TotalDocumentCount", 79003); + }, 600_000, "Timed out waiting for OfflineSegmentIntervalChecker"); } - // TODO: tests for other ControllerPeriodicTasks (RetentionManager, RealtimeSegmentValidationManager) + private boolean checkValidationGaugeValue(ValidationMetrics validationMetrics, String tableNameWithType, + String gaugeName, long expectedValue) { + return validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType, gaugeName)) + == expectedValue; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org