This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1d1d25dc0f Update getValidDocIdsMetadataFromServer to make call in batches to servers and other bug fixes (#13314) 1d1d25dc0f is described below commit 1d1d25dc0f1fc1abb73d9516414168c82b116b58 Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Thu Jun 6 23:25:59 2024 +0530 Update getValidDocIdsMetadataFromServer to make call in batches to servers and other bug fixes (#13314) * Update getValidDocIdsMetadataFromServer to make call in batches to server --- .../api/resources/PinotTableRestletResource.java | 8 ++++-- .../controller/util/CompletionServiceHelper.java | 24 +++++++++++----- .../util/ServerSegmentMetadataReader.java | 32 +++++++++++++--------- .../pinot/controller/util/TableMetadataReader.java | 4 +-- .../apache/pinot/core/common/MinionConstants.java | 5 ++++ .../UpsertCompactionTaskGenerator.java | 12 +++++--- .../pinot/server/api/resources/TablesResource.java | 8 ++++-- 7 files changed, 62 insertions(+), 31 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index fafd42de93..59e748a08f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -972,7 +972,10 @@ public class PinotTableRestletResource { @ApiParam(value = "A list of segments", allowMultiple = true) @QueryParam("segmentNames") List<String> segmentNames, @ApiParam(value = "Valid doc ids type") @QueryParam("validDocIdsType") - @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType, @Context HttpHeaders headers) { + @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType, + @ApiParam(value = "Number of segments in a batch per server request") + @QueryParam("serverRequestBatchSize") @DefaultValue("500") int serverRequestBatchSize, + @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); LOGGER.info("Received a request to fetch aggregate validDocIds metadata for a table {}", tableName); TableType tableType = Constants.validateTableType(tableTypeStr); @@ -990,7 +993,8 @@ public class PinotTableRestletResource { validDocIdsType = (validDocIdsType == null) ? ValidDocIdsType.SNAPSHOT : validDocIdsType; JsonNode segmentsMetadataJson = tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType, segmentNames, - validDocIdsType.toString(), _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + validDocIdsType.toString(), _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, + serverRequestBatchSize); validDocIdsMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson); } catch (InvalidConfigException e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java index 236242660a..6e9d891e5c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java @@ -129,13 +129,20 @@ public class CompletionServiceHelper { int statusCode = multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode(); if (statusCode >= 300) { String reason = multiHttpRequestResponse.getResponse().getStatusLine().getReasonPhrase(); - LOGGER.error("Server: {} returned error: {}, reason: {}", instance, statusCode, reason); + LOGGER.error("Server: {} returned error: {}, reason: {} for uri: {}", instance, statusCode, reason, uri); completionServiceResponse._failedResponseCount++; continue; } String responseString = EntityUtils.toString(multiHttpRequestResponse.getResponse().getEntity()); - completionServiceResponse._httpResponses - .put(multiRequestPerServer ? uri.toString() : instance, responseString); + String key = multiRequestPerServer ? uri.toString() : instance; + // If there are multiple requests to the same server with the same URI but different payloads, + // we append a count value to the key to ensure each response is uniquely identified. + // Otherwise, the map will store only the last response, overwriting previous ones. + if (multiRequestPerServer) { + int count = completionServiceResponse._instanceToRequestCount.compute(key, (k, v) -> v == null ? 1 : v + 1); + key = key + "__" + count; + } + completionServiceResponse._httpResponses.put(key, responseString); } catch (Exception e) { String reason = useCase == null ? "" : String.format(" in '%s'", useCase); LOGGER.error("Connection error {}. Details: {}", reason, e.getMessage()); @@ -151,10 +158,10 @@ public class CompletionServiceHelper { } } - int numServersResponded = completionServiceResponse._httpResponses.size(); - if (numServersResponded != size) { - LOGGER.warn("Finished reading information for table: {} with {}/{} server responses", tableNameWithType, - numServersResponded, size); + int numServerRequestsResponded = completionServiceResponse._httpResponses.size(); + if (numServerRequestsResponded != size) { + LOGGER.warn("Finished reading information for table: {} with {}/{} server-request responses", tableNameWithType, + numServerRequestsResponded, size); } else { LOGGER.info("Finished reading information for table: {}", tableNameWithType); } @@ -180,10 +187,13 @@ public class CompletionServiceHelper { public Map<String, String> _httpResponses; // Number of failures encountered when requesting public int _failedResponseCount; + // Map of instance to count of requests + public Map<String, Integer> _instanceToRequestCount; public CompletionServiceResponse() { _httpResponses = new HashMap<>(); _failedResponseCount = 0; + _instanceToRequestCount = new HashMap<>(); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index 320649c0d1..dcca712af4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; +import com.google.common.collect.Lists; import java.io.IOException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -220,7 +221,8 @@ public class ServerSegmentMetadataReader { */ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tableNameWithType, Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> serverToEndpoints, - @Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType) { + @Nullable List<String> segmentNames, int timeoutMs, String validDocIdsType, + int numSegmentsBatchPerServerRequest) { List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>(); for (Map.Entry<String, List<String>> serverToSegments : serverToSegmentsMap.entrySet()) { List<String> segmentsForServer = serverToSegments.getValue(); @@ -235,8 +237,12 @@ public class ServerSegmentMetadataReader { } } } - serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQuery, validDocIdsType, - serverToEndpoints.get(serverToSegments.getKey()))); + + // Number of segments to query per server request. If a table has a lot of segments, then we might send a + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. + Lists.partition(segmentsToQuery, numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch -> + serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, segmentsToQueryBatch, + validDocIdsType, serverToEndpoints.get(serverToSegments.getKey())))); } BiMap<String, String> endpointsToServers = serverToEndpoints.inverse(); @@ -247,12 +253,12 @@ public class ServerSegmentMetadataReader { Map<String, String> requestHeaders = Map.of("Content-Type", "application/json"); CompletionServiceHelper.CompletionServiceResponse serviceResponse = - completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, false, requestHeaders, + completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, true, requestHeaders, timeoutMs, null); Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new HashMap<>(); int failedParses = 0; - int returnedServersCount = 0; + int returnedServerRequestsCount = 0; for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) { try { String validDocIdsMetadataList = streamResponse.getValue(); @@ -262,21 +268,21 @@ public class ServerSegmentMetadataReader { for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: validDocIdsMetadataInfoList) { validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo); } - returnedServersCount++; + returnedServerRequestsCount++; } catch (Exception e) { failedParses++; - LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); + LOGGER.error("Unable to parse {} server-request response due to an error: ", streamResponse.getKey(), e); } } if (failedParses != 0) { - LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses, + LOGGER.error("Unable to parse {} / {} server-request responses due to an error: ", failedParses, serverURLsAndBodies.size()); } - if (returnedServersCount != serverURLsAndBodies.size()) { - LOGGER.error("Unable to get validDocIdsMetadata from all servers. Expected: {}, Actual: {}", - serverURLsAndBodies.size(), returnedServersCount); + if (returnedServerRequestsCount != serverURLsAndBodies.size()) { + LOGGER.error("Unable to get validDocIdsMetadata from all server requests. Expected: {}, Actual: {}", + serverURLsAndBodies.size(), returnedServerRequestsCount); } if (segmentNames != null && !segmentNames.isEmpty() && segmentNames.size() != validDocIdsMetadataInfos.size()) { @@ -284,8 +290,8 @@ public class ServerSegmentMetadataReader { segmentNames.size(), validDocIdsMetadataInfos.size()); } - LOGGER.info("Retrieved validDocIds metadata for {} segments from {} servers.", validDocIdsMetadataInfos.size(), - returnedServersCount); + LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server requests.", + validDocIdsMetadataInfos.size(), returnedServerRequestsCount); return new ArrayList<>(validDocIdsMetadataInfos.values()); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java index 389c8d2e94..cc80291278 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java @@ -159,7 +159,7 @@ public class TableMetadataReader { * @return a list of ValidDocIdsMetadataInfo */ public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, List<String> segmentNames, - String validDocIdsType, int timeoutMs) + String validDocIdsType, int timeoutMs, int numSegmentsBatchPerServerRequest) throws InvalidConfigException { final Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); @@ -170,7 +170,7 @@ public class TableMetadataReader { List<ValidDocIdsMetadataInfo> aggregateTableMetadataInfo = serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, endpoints, - segmentNames, timeoutMs, validDocIdsType); + segmentNames, timeoutMs, validDocIdsType, numSegmentsBatchPerServerRequest); return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 02f00046b9..9b1b89b4b7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -169,5 +169,10 @@ public class MinionConstants { * Valid doc ids type */ public static final String VALID_DOC_IDS_TYPE = "validDocIdsType"; + + /** + * number of segments to query in one batch to fetch valid doc id metadata, by default 500 + */ + public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest"; } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java index 0d22486e79..f63836f19b 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -58,6 +58,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { private static final String DEFAULT_BUFFER_PERIOD = "7d"; private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0; private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0; + private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500; public static class SegmentSelectionResult { @@ -137,15 +138,18 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(), _clusterInfoAccessor.getConnectionManager()); - // TODO: currently, we put segmentNames=null to get metadata for all segments. We can change this to get - // valid doc id metadata in batches with the loop. - // By default, we use 'snapshot' for validDocIdsType. This means that we will use the validDocIds bitmap from // the snapshot from Pinot segment. This will require 'enableSnapshot' from UpsertConfig to be set to true. String validDocIdsTypeStr = taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE, ValidDocIdsType.SNAPSHOT.toString()); ValidDocIdsType validDocIdsType = ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase()); + // Number of segments to query per server request. If a table has a lot of segments, then we might send a + // huge payload to pinot-server in request. Batching the requests will help in reducing the payload size. + int numSegmentsBatchPerServerRequest = + Integer.parseInt(taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST, + String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))); + // Validate that the snapshot is enabled if validDocIdsType is validDocIdsSnapshot if (validDocIdsType == ValidDocIdsType.SNAPSHOT) { UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); @@ -163,7 +167,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { List<ValidDocIdsMetadataInfo> validDocIdsMetadataList = serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments, - serverToEndpoints, null, 60_000, validDocIdsType.toString()); + serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest); Map<String, SegmentZKMetadata> completedSegmentsMap = completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 063608235a..991f607199 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -657,9 +657,11 @@ public class TablesResource { } try { if (!missingSegments.isEmpty()) { - throw new WebApplicationException( - String.format("Table %s has missing segments: %s)", tableNameWithType, segments), - Response.Status.NOT_FOUND); + // we need not abort here or throw exception as we can still process the segments that are available + // During UpsertCompactionTaskGenerator, controller sends a lot of segments to server to fetch validDocIds + // and it may happen that a segment is deleted concurrently. In such cases, we should log a warning and + // process the remaining available segments. + LOGGER.warn("Table {} has missing segments {}", tableNameWithType, missingSegments); } List<Map<String, Object>> allValidDocIdsMetadata = new ArrayList<>(segmentDataManagers.size()); for (SegmentDataManager segmentDataManager : segmentDataManagers) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org