This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b69f438a79 Make Preload Integration test more extensible (#11195)
b69f438a79 is described below
commit b69f438a79716179c3a7c8161a67d675bdd9909e
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Jul 28 22:59:01 2023 +0530
Make Preload Integration test more extensible (#11195)
* Make Preload Integration test more extensible
* Make snapshot method protected as well
---------
Co-authored-by: Kartik Khare <[email protected]>
---
.../UpsertTableSegmentPreloadIntegrationTest.java | 38 ++++++++++++++--------
1 file changed, 24 insertions(+), 14 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
index e662403347..0bd5a84af6 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java
@@ -73,12 +73,17 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
startBroker();
startServers(NUM_SERVERS);
- // Unpack the Avro files
- List<File> avroFiles = unpackAvroData(_tempDir);
-
// Start Kafka and push data into Kafka
startKafka();
+ populateTables();
+ }
+
+ protected void populateTables()
+ throws Exception {
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
@@ -192,6 +197,16 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
assertEquals(getCurrentCountStarResult(), getCountStarResult());
assertEquals(getCurrentCountStarResultWithoutUpsert(),
getCountStarResultWithoutUpsert());
+ waitForSnapshotCreation();
+
+ // Restart the servers and check again
+ restartServers();
+ verifyIdealState(7);
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ protected void waitForSnapshotCreation()
+ throws Exception {
Set<String> consumingSegments =
getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
// trigger force commit for snapshots
String jobId = forceCommit(getTableName());
@@ -211,7 +226,7 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR);
File[] files = new File(segmentDir, getTableName() +
"_REALTIME").listFiles();
for (File file : files) {
- if (file.getName().contains("tmp") ||
file.getName().contains("consumer")) {
+ if (!file.getName().startsWith(getTableName())) {
continue;
}
if (file.isDirectory()) {
@@ -231,15 +246,10 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
} catch (Exception e) {
return false;
}
- }, 60000L, "Error verifying force commit operation on table!");
-
- // Restart the servers and check again
- restartServers();
- verifyIdealState(7);
- waitForAllDocsLoaded(600_000L);
+ }, 120000L, "Error verifying force commit operation on table!");
}
- private void verifyIdealState(int numSegmentsExpected) {
+ protected void verifyIdealState(int numSegmentsExpected) {
IdealState idealState = HelixHelper.getTableIdealState(_helixManager,
REALTIME_TABLE_NAME);
Map<String, Map<String, String>> segmentAssignment =
idealState.getRecord().getMapFields();
assertEquals(segmentAssignment.size(), numSegmentsExpected);
@@ -295,7 +305,7 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
}
}
- public Set<String> getConsumingSegmentsFromIdealState(String
tableNameWithType) {
+ protected Set<String> getConsumingSegmentsFromIdealState(String
tableNameWithType) {
IdealState tableIdealState =
_controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType);
Map<String, Map<String, String>> segmentAssignment =
tableIdealState.getRecord().getMapFields();
Set<String> matchingSegments = new
HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
@@ -308,7 +318,7 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
return matchingSegments;
}
- public boolean isForceCommitJobCompleted(String forceCommitJobId)
+ protected boolean isForceCommitJobCompleted(String forceCommitJobId)
throws Exception {
String jobStatusResponse =
sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId));
JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
@@ -318,7 +328,7 @@ public class UpsertTableSegmentPreloadIntegrationTest
extends BaseClusterIntegra
return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
}
- private String forceCommit(String tableName)
+ protected String forceCommit(String tableName)
throws Exception {
String response =
sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName),
null);
return
JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]