This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 52d16f7668 Add integration test for rebalance in upsert tables (#11568)
52d16f7668 is described below
commit 52d16f76685929eb46d001f0a435beca204da6cb
Author: Kartik Khare <[email protected]>
AuthorDate: Mon Oct 9 11:54:13 2023 +0530
Add integration test for rebalance in upsert tables (#11568)
* Add integration test for rebalance in upsert tables
* Add reload test as well
* Reload tests
* Remove upsert integration test since partial upsert tests cover both
* Add explicit status checks for rebalance/reload jobs rather than sleep
* Fix flakiness due to segment commit during reload
* Use new RebalanceConfig class
* Add missing partial upsert cases
* Refactor jobstatus to segment reload status
---------
Co-authored-by: Kartik Khare <[email protected]>
Co-authored-by: Kartik Khare
<[email protected]>
---
.../controller/helix/ControllerRequestClient.java | 6 +-
.../pinot/controller/helix/ControllerTest.java | 12 +-
.../tests/BaseClusterIntegrationTestSet.java | 4 +-
...PartialUpsertTableRebalanceIntegrationTest.java | 451 +++++++++++++++++++++
.../utils/builder/ControllerRequestURLBuilder.java | 6 +-
5 files changed, 468 insertions(+), 11 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 3ec7fc3642..ff27954f70 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -175,11 +175,13 @@ public class ControllerRequestClient {
}
}
- public void reloadTable(String tableName, TableType tableType, boolean
forceDownload)
+ public String reloadTable(String tableName, TableType tableType, boolean
forceDownload)
throws IOException {
try {
- HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new
URL(
+ SimpleHttpResponse simpleHttpResponse =
+
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL(
_controllerRequestURLBuilder.forTableReload(tableName, tableType,
forceDownload)).toURI(), null));
+ return simpleHttpResponse.getResponse();
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 33932ee38b..e4f62da327 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -716,14 +716,14 @@ public class ControllerTest {
return getControllerRequestClient().getTableSize(tableName);
}
- public void reloadOfflineTable(String tableName)
+ public String reloadOfflineTable(String tableName)
throws IOException {
- reloadOfflineTable(tableName, false);
+ return reloadOfflineTable(tableName, false);
}
- public void reloadOfflineTable(String tableName, boolean forceDownload)
+ public String reloadOfflineTable(String tableName, boolean forceDownload)
throws IOException {
- getControllerRequestClient().reloadTable(tableName, TableType.OFFLINE,
forceDownload);
+ return getControllerRequestClient().reloadTable(tableName,
TableType.OFFLINE, forceDownload);
}
public void reloadOfflineSegment(String tableName, String segmentName,
boolean forceDownload)
@@ -731,9 +731,9 @@ public class ControllerTest {
getControllerRequestClient().reloadSegment(tableName, segmentName,
forceDownload);
}
- public void reloadRealtimeTable(String tableName)
+ public String reloadRealtimeTable(String tableName)
throws IOException {
- getControllerRequestClient().reloadTable(tableName, TableType.REALTIME,
false);
+ return getControllerRequestClient().reloadTable(tableName,
TableType.REALTIME, false);
}
public void createBrokerTenant(String tenantName, int numBrokers)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index bcda7532ed..1d2a75215f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -651,7 +651,7 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
String isZKWriteSuccess =
tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
assertEquals(isZKWriteSuccess, "SUCCESS");
String jobId = tableLevelDetails.get("reloadJobId").asText();
- String jobStatusResponse =
sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId));
+ String jobStatusResponse =
sendGetRequest(_controllerRequestURLBuilder.forSegmentReloadStatus(jobId));
JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
// Validate all fields are present
@@ -663,7 +663,7 @@ public abstract class BaseClusterIntegrationTestSet extends
BaseClusterIntegrati
public boolean isReloadJobCompleted(String reloadJobId)
throws Exception {
- String jobStatusResponse =
sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(reloadJobId));
+ String jobStatusResponse =
sendGetRequest(_controllerRequestURLBuilder.forSegmentReloadStatus(reloadJobId));
JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
assertEquals(jobStatus.get("metadata").get("jobId").asText(), reloadJobId);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
new file mode 100644
index 0000000000..9041c7aed9
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.resources.PauseStatus;
+import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
+import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class PartialUpsertTableRebalanceIntegrationTest extends
BaseClusterIntegrationTest {
+ private static final int NUM_SERVERS = 1;
+ private static final String PRIMARY_KEY_COL = "clientId";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
+
+ // Segment 1 contains records of pk value 100000 (partition 0)
+ private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
+ // Segment 2 contains records of pk value 100001 (partition 1)
+ private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
+ // Segment 3 contains records of pk value 100002 (partition 1)
+ private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
+
+ private PinotHelixResourceManager _resourceManager;
+ private TableRebalancer _tableRebalancer;
+ private static List<File> _avroFiles;
+ private TableConfig _tableConfig;
+ private Schema _schema;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ startZk();
+ startController();
+ startBroker();
+ startServers(NUM_SERVERS);
+
+ // Start Kafka and push data into Kafka
+ startKafka();
+
+ _resourceManager = getControllerStarter().getHelixResourceManager();
+ _tableRebalancer = new
TableRebalancer(_resourceManager.getHelixZkManager());
+
+ createSchemaAndTable();
+ }
+
+ @Test
+ public void testRebalance()
+ throws Exception {
+ populateTables();
+
+ verifyIdealState(5, NUM_SERVERS);
+
+ // setup the rebalance config
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ // Add a new server
+ BaseServerStarter serverStarter1 = startOneServer(1234);
+
+ // Now we trigger a rebalance operation
+ TableConfig tableConfig =
_resourceManager.getTableConfig(REALTIME_TABLE_NAME);
+ RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig,
rebalanceConfig);
+
+ // Check the number of replicas after rebalancing
+ int finalReplicas =
_resourceManager.getServerInstancesForTable(getTableName(),
TableType.REALTIME).size();
+
+ // Check that a replica has been added
+ assertEquals(finalReplicas, NUM_SERVERS + 1, "Rebalancing didn't correctly
add the new server");
+
+ waitForRebalanceToComplete(rebalanceResult, 600_000L);
+ waitForAllDocsLoaded(600_000L);
+
+ verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5,
finalReplicas);
+
+ // Add a new server
+ BaseServerStarter serverStarter2 = startOneServer(4567);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig);
+
+ // Check the number of replicas after rebalancing
+ finalReplicas =
_resourceManager.getServerInstancesForTable(getTableName(),
TableType.REALTIME).size();
+
+ // Check that a replica has been added
+ assertEquals(finalReplicas, NUM_SERVERS + 2, "Rebalancing didn't correctly
add the new server");
+
+ waitForRebalanceToComplete(rebalanceResult, 600_000L);
+ waitForAllDocsLoaded(600_000L);
+
+ // number of instances assigned can't be more than number of partitions
for rf = 1
+ verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5,
getNumKafkaPartitions());
+
+ _resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "",
false);
+ _resourceManager.updateInstanceTags(serverStarter2.getInstanceId(), "",
false);
+
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setDowntime(true);
+
+
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig);
+
+ verifySegmentAssignment(rebalanceResult.getSegmentAssignment(), 5,
NUM_SERVERS);
+
+ waitForRebalanceToComplete(rebalanceResult, 600_000L);
+ waitForAllDocsLoaded(600_000L);
+
+ _resourceManager.disableInstance(serverStarter1.getInstanceId());
+ _resourceManager.disableInstance(serverStarter2.getInstanceId());
+
+ _resourceManager.dropInstance(serverStarter1.getInstanceId());
+ _resourceManager.dropInstance(serverStarter2.getInstanceId());
+
+ serverStarter1.stop();
+ serverStarter2.stop();
+ }
+
+ @Test
+ public void testReload()
+ throws Exception {
+ pushAvroIntoKafka(_avroFiles);
+ waitForAllDocsLoaded(600_000L, 300);
+
+ String statusResponse = reloadRealtimeTable(getTableName());
+ Map<String, String> statusResponseJson =
+ JsonUtils.stringToObject(statusResponse, new TypeReference<Map<String,
String>>() {
+ });
+ String reloadResponse = statusResponseJson.get("status");
+ int jsonStartIndex = reloadResponse.indexOf("{");
+ String trimmedResponse = reloadResponse.substring(jsonStartIndex);
+ Map<String, Map<String, String>> reloadStatus =
+ JsonUtils.stringToObject(trimmedResponse, new
TypeReference<Map<String, Map<String, String>>>() {
+ });
+ String reloadJobId =
reloadStatus.get(REALTIME_TABLE_NAME).get("reloadJobId");
+ waitForReloadToComplete(reloadJobId, 600_000L);
+ waitForAllDocsLoaded(600_000L, 300);
+ verifyIdealState(4, NUM_SERVERS); // 4 because reload triggers commit of
consuming segments
+ }
+
+ @AfterMethod
+ public void afterMethod()
+ throws Exception {
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ getControllerRequestClient().pauseConsumption(realtimeTableName);
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ PauseStatus pauseStatus =
getControllerRequestClient().getPauseStatus(realtimeTableName);
+ return pauseStatus.getConsumingSegments().isEmpty();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to drop the segments");
+
+ // Test dropping all segments one by one
+ List<String> segments = listSegments(realtimeTableName);
+ for (String segment : segments) {
+ dropSegment(realtimeTableName, segment);
+ }
+
+ // NOTE: There is a delay to remove the segment from property store
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ return listSegments(realtimeTableName).isEmpty();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }, 60_000L, "Failed to drop the segments");
+
+ stopServer();
+ stopKafka(); // to clean up the topic
+ startServers(NUM_SERVERS);
+ startKafka();
+ getControllerRequestClient().resumeConsumption(realtimeTableName);
+ }
+
+ protected void verifySegmentAssignment(Map<String, Map<String, String>>
segmentAssignment, int numSegmentsExpected,
+ int numInstancesExpected) {
+ assertEquals(segmentAssignment.size(), numSegmentsExpected);
+
+ int maxSequenceNumber = 0;
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ maxSequenceNumber = Math.max(maxSequenceNumber,
llcSegmentName.getSequenceNumber());
+ }
+ }
+
+ Map<Integer, String> serverForPartition = new HashMap<>();
+ Set<String> uniqueServers = new HashSet<>();
+ for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> instanceStateMap = entry.getValue();
+
+ // Verify that all segments have the correct state
+ assertEquals(instanceStateMap.size(), 1);
+ Map.Entry<String, String> instanceIdAndState =
instanceStateMap.entrySet().iterator().next();
+ String state = instanceIdAndState.getValue();
+ if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) {
+ assertEquals(state,
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+ } else {
+ assertEquals(state,
CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+ }
+ } else {
+ assertEquals(state,
CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+ }
+
+ // Verify that all segments of the same partition are mapped to the same
server
+ String instanceId = instanceIdAndState.getKey();
+ int partitionId = getSegmentPartitionId(segmentName);
+ uniqueServers.add(instanceId);
+ String prevInstance = serverForPartition.computeIfAbsent(partitionId, k
-> instanceId);
+ assertEquals(instanceId, prevInstance);
+ }
+
+ assertEquals(uniqueServers.size(), numInstancesExpected);
+ }
+
+ protected void verifyIdealState(int numSegmentsExpected, int
numInstancesExpected) {
+ IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
REALTIME_TABLE_NAME);
+ verifySegmentAssignment(idealState.getRecord().getMapFields(),
numSegmentsExpected, numInstancesExpected);
+ }
+
+ protected void populateTables()
+ throws Exception {
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles,
_tableConfig, _schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
+
+ pushAvroIntoKafka(_avroFiles);
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ protected void createSchemaAndTable()
+ throws Exception {
+ // Unpack the Avro files
+ _avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload schema and table config
+ _schema = createSchema();
+ addSchema(_schema);
+ _tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL,
null, getNumKafkaPartitions());
+ _tableConfig.getValidationConfig().setDeletedSegmentsRetentionPeriod(null);
+ _tableConfig.getUpsertConfig().setMode(UpsertConfig.Mode.PARTIAL);
+ _tableConfig.getUpsertConfig().setPartialUpsertStrategies(new HashMap<>());
+
_tableConfig.getUpsertConfig().setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
+ _tableConfig.getIndexingConfig().setNullHandlingEnabled(true);
+
+ addTableConfig(_tableConfig);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+ }
+
+ @Override
+ protected String getSchemaFileName() {
+ return "upsert_upload_segment_test.schema";
+ }
+
+ @Override
+ protected String getAvroTarFileName() {
+ return "upsert_upload_segment_test.tar.gz";
+ }
+
+ @Override
+ protected String getPartitionColumn() {
+ return PRIMARY_KEY_COL;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // Three distinct records are expected with pk values of 100000, 100001,
100002
+ return 3;
+ }
+
+ protected void waitForAllDocsLoaded(long timeoutMs, long expectedCount)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return getCurrentCountStarResultWithoutUpsert() == expectedCount;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to load all documents");
+ }
+
+ private static int getSegmentPartitionId(String segmentName) {
+ switch (segmentName) {
+ case UPLOADED_SEGMENT_1:
+ return 0;
+ case UPLOADED_SEGMENT_2:
+ case UPLOADED_SEGMENT_3:
+ return 1;
+ default:
+ return new LLCSegmentName(segmentName).getPartitionGroupId();
+ }
+ }
+
+ protected void waitForRebalanceToComplete(RebalanceResult rebalanceResult,
long timeoutMs)
+ throws Exception {
+ String jobId = rebalanceResult.getJobId();
+ if (rebalanceResult.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(jobId);
+ try {
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+
+ ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ String status =
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
+ return status.equals(RebalanceResult.Status.DONE.toString()) ||
status.equals(
+ RebalanceResult.Status.FAILED.toString());
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to load all segments after rebalance");
+ }
+
+ protected void waitForReloadToComplete(String reloadJobId, long timeoutMs)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forSegmentReloadStatus(reloadJobId);
+ try {
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new
URL(requestUrl).toURI(), null));
+ ServerReloadControllerJobStatusResponse segmentReloadStatusValue =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerReloadControllerJobStatusResponse.class);
+ return segmentReloadStatusValue.getSuccessCount() ==
segmentReloadStatusValue.getTotalSegmentCount();
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to load all segments after reload");
+ }
+
+ @Override
+ protected void waitForAllDocsLoaded(long timeoutMs)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ boolean c1 = getCurrentCountStarResultWithoutUpsert() ==
getCountStarResultWithoutUpsert();
+ boolean c2 = getCurrentCountStarResult() == getCountStarResult();
+ // verify there are no null rows
+ boolean c3 =
+ getCurrentCountStarResultWithoutNulls(getTableName(), _schema) ==
getCountStarResultWithoutUpsert();
+ return c1 && c2 && c3;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ }
+
+ private long getCurrentCountStarResultWithoutUpsert() {
+ return getPinotConnection().execute("SELECT COUNT(*) FROM " +
getTableName() + " OPTION(skipUpsert=true)")
+ .getResultSet(0).getLong(0);
+ }
+
+ private long getCountStarResultWithoutUpsert() {
+ // 3 Avro files, each with 100 documents, one copy from streaming source,
one copy from batch source
+ return 600;
+ }
+
+ protected long getCurrentCountStarResultWithoutNulls(String tableName,
Schema schema) {
+ StringBuilder queryFilter = new StringBuilder(" WHERE ");
+ for (String column : schema.getColumnNames()) {
+ if (schema.getFieldSpecFor(column).isSingleValueField()) {
+ queryFilter.append(column).append(" IS NOT NULL AND ");
+ }
+ }
+
+ // remove last AND
+ queryFilter = new StringBuilder(queryFilter.substring(0,
queryFilter.length() - 5));
+
+ ResultSetGroup resultSetGroup =
+ getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName +
queryFilter + " OPTION(skipUpsert=true)");
+ if (resultSetGroup.getResultSetCount() > 0) {
+ return resultSetGroup.getResultSet(0).getLong(0);
+ }
+ return 0;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index a4c85505dc..192ba71ec0 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -236,12 +236,16 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "segments", tableName, query);
}
+ public String forTableRebalanceStatus(String jobId) {
+ return StringUtil.join("/", _baseUrl, "rebalanceStatus", jobId);
+ }
+
public String forTableReset(String tableNameWithType, @Nullable String
targetInstance) {
String query = targetInstance == null ? "reset" :
String.format("reset?targetInstance=%s", targetInstance);
return StringUtil.join("/", _baseUrl, "segments", tableNameWithType,
query);
}
- public String forControllerJobStatus(String jobId) {
+ public String forSegmentReloadStatus(String jobId) {
return StringUtil.join("/", _baseUrl, "segments", "segmentReloadStatus",
jobId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]