This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d1d25dc0f Update getValidDocIdsMetadataFromServer to make call in 
batches to servers and other bug fixes (#13314)
1d1d25dc0f is described below

commit 1d1d25dc0f1fc1abb73d9516414168c82b116b58
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Thu Jun 6 23:25:59 2024 +0530

    Update getValidDocIdsMetadataFromServer to make call in batches to servers 
and other bug fixes (#13314)
    
    * Update getValidDocIdsMetadataFromServer to make call in batches to server
---
 .../api/resources/PinotTableRestletResource.java   |  8 ++++--
 .../controller/util/CompletionServiceHelper.java   | 24 +++++++++++-----
 .../util/ServerSegmentMetadataReader.java          | 32 +++++++++++++---------
 .../pinot/controller/util/TableMetadataReader.java |  4 +--
 .../apache/pinot/core/common/MinionConstants.java  |  5 ++++
 .../UpsertCompactionTaskGenerator.java             | 12 +++++---
 .../pinot/server/api/resources/TablesResource.java |  8 ++++--
 7 files changed, 62 insertions(+), 31 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index fafd42de93..59e748a08f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -972,7 +972,10 @@ public class PinotTableRestletResource {
       @ApiParam(value = "A list of segments", allowMultiple = true) 
@QueryParam("segmentNames")
       List<String> segmentNames,
       @ApiParam(value = "Valid doc ids type") @QueryParam("validDocIdsType")
-      @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType, @Context 
HttpHeaders headers) {
+      @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType,
+      @ApiParam(value = "Number of segments in a batch per server request")
+      @QueryParam("serverRequestBatchSize") @DefaultValue("500") int 
serverRequestBatchSize,
+      @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     LOGGER.info("Received a request to fetch aggregate validDocIds metadata 
for a table {}", tableName);
     TableType tableType = Constants.validateTableType(tableTypeStr);
@@ -990,7 +993,8 @@ public class PinotTableRestletResource {
       validDocIdsType = (validDocIdsType == null) ? ValidDocIdsType.SNAPSHOT : 
validDocIdsType;
       JsonNode segmentsMetadataJson =
           
tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType, 
segmentNames,
-              validDocIdsType.toString(), 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+              validDocIdsType.toString(), 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000,
+              serverRequestBatchSize);
       validDocIdsMetadata = 
JsonUtils.objectToPrettyString(segmentsMetadataJson);
     } catch (InvalidConfigException e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index 236242660a..6e9d891e5c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -129,13 +129,20 @@ public class CompletionServiceHelper {
         int statusCode = 
multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode();
         if (statusCode >= 300) {
           String reason = 
multiHttpRequestResponse.getResponse().getStatusLine().getReasonPhrase();
-          LOGGER.error("Server: {} returned error: {}, reason: {}", instance, 
statusCode, reason);
+          LOGGER.error("Server: {} returned error: {}, reason: {} for uri: 
{}", instance, statusCode, reason, uri);
           completionServiceResponse._failedResponseCount++;
           continue;
         }
         String responseString = 
EntityUtils.toString(multiHttpRequestResponse.getResponse().getEntity());
-        completionServiceResponse._httpResponses
-            .put(multiRequestPerServer ? uri.toString() : instance, 
responseString);
+        String key = multiRequestPerServer ? uri.toString() : instance;
+        // If there are multiple requests to the same server with the same URI 
but different payloads,
+        // we append a count value to the key to ensure each response is 
uniquely identified.
+        // Otherwise, the map will store only the last response, overwriting 
previous ones.
+        if (multiRequestPerServer) {
+          int count = 
completionServiceResponse._instanceToRequestCount.compute(key, (k, v) -> v == 
null ? 1 : v + 1);
+          key = key + "__" + count;
+        }
+        completionServiceResponse._httpResponses.put(key, responseString);
       } catch (Exception e) {
         String reason = useCase == null ? "" : String.format(" in '%s'", 
useCase);
         LOGGER.error("Connection error {}. Details: {}", reason, 
e.getMessage());
@@ -151,10 +158,10 @@ public class CompletionServiceHelper {
       }
     }
 
-    int numServersResponded = completionServiceResponse._httpResponses.size();
-    if (numServersResponded != size) {
-      LOGGER.warn("Finished reading information for table: {} with {}/{} 
server responses", tableNameWithType,
-          numServersResponded, size);
+    int numServerRequestsResponded = 
completionServiceResponse._httpResponses.size();
+    if (numServerRequestsResponded != size) {
+      LOGGER.warn("Finished reading information for table: {} with {}/{} 
server-request responses", tableNameWithType,
+          numServerRequestsResponded, size);
     } else {
       LOGGER.info("Finished reading information for table: {}", 
tableNameWithType);
     }
@@ -180,10 +187,13 @@ public class CompletionServiceHelper {
     public Map<String, String> _httpResponses;
     // Number of failures encountered when requesting
     public int _failedResponseCount;
+    // Map of instance to count of requests
+    public Map<String, Integer> _instanceToRequestCount;
 
     public CompletionServiceResponse() {
       _httpResponses = new HashMap<>();
       _failedResponseCount = 0;
+      _instanceToRequestCount = new HashMap<>();
     }
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 320649c0d1..dcca712af4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
@@ -220,7 +221,8 @@ public class ServerSegmentMetadataReader {
    */
   public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String 
tableNameWithType,
       Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
-      @Nullable List<String> segmentNames, int timeoutMs, String 
validDocIdsType) {
+      @Nullable List<String> segmentNames, int timeoutMs, String 
validDocIdsType,
+      int numSegmentsBatchPerServerRequest) {
     List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
     for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
       List<String> segmentsForServer = serverToSegments.getValue();
@@ -235,8 +237,12 @@ public class ServerSegmentMetadataReader {
           }
         }
       }
-      
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, 
segmentsToQuery, validDocIdsType,
-          serverToEndpoints.get(serverToSegments.getKey())));
+
+      // Number of segments to query per server request. If a table has a lot 
of segments, then we might send a
+      // huge payload to pinot-server in request. Batching the requests will 
help in reducing the payload size.
+      Lists.partition(segmentsToQuery, 
numSegmentsBatchPerServerRequest).forEach(segmentsToQueryBatch ->
+          
serverURLsAndBodies.add(generateValidDocIdsMetadataURL(tableNameWithType, 
segmentsToQueryBatch,
+              validDocIdsType, 
serverToEndpoints.get(serverToSegments.getKey()))));
     }
 
     BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();
@@ -247,12 +253,12 @@ public class ServerSegmentMetadataReader {
 
     Map<String, String> requestHeaders = Map.of("Content-Type", 
"application/json");
     CompletionServiceHelper.CompletionServiceResponse serviceResponse =
-        completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, false, requestHeaders,
+        completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, true, requestHeaders,
             timeoutMs, null);
 
     Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new 
HashMap<>();
     int failedParses = 0;
-    int returnedServersCount = 0;
+    int returnedServerRequestsCount = 0;
     for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
       try {
         String validDocIdsMetadataList = streamResponse.getValue();
@@ -262,21 +268,21 @@ public class ServerSegmentMetadataReader {
         for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: 
validDocIdsMetadataInfoList) {
           
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), 
validDocIdsMetadataInfo);
         }
-        returnedServersCount++;
+        returnedServerRequestsCount++;
       } catch (Exception e) {
         failedParses++;
-        LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
+        LOGGER.error("Unable to parse {} server-request response due to an 
error: ", streamResponse.getKey(), e);
       }
     }
 
     if (failedParses != 0) {
-      LOGGER.error("Unable to parse server {} / {} response due to an error: 
", failedParses,
+      LOGGER.error("Unable to parse {} / {} server-request responses due to an 
error: ", failedParses,
           serverURLsAndBodies.size());
     }
 
-    if (returnedServersCount != serverURLsAndBodies.size()) {
-      LOGGER.error("Unable to get validDocIdsMetadata from all servers. 
Expected: {}, Actual: {}",
-          serverURLsAndBodies.size(), returnedServersCount);
+    if (returnedServerRequestsCount != serverURLsAndBodies.size()) {
+      LOGGER.error("Unable to get validDocIdsMetadata from all server 
requests. Expected: {}, Actual: {}",
+          serverURLsAndBodies.size(), returnedServerRequestsCount);
     }
 
     if (segmentNames != null && !segmentNames.isEmpty() && segmentNames.size() 
!= validDocIdsMetadataInfos.size()) {
@@ -284,8 +290,8 @@ public class ServerSegmentMetadataReader {
           segmentNames.size(), validDocIdsMetadataInfos.size());
     }
 
-    LOGGER.info("Retrieved validDocIds metadata for {} segments from {} 
servers.", validDocIdsMetadataInfos.size(),
-        returnedServersCount);
+    LOGGER.info("Retrieved validDocIds metadata for {} segments from {} server 
requests.",
+        validDocIdsMetadataInfos.size(), returnedServerRequestsCount);
     return new ArrayList<>(validDocIdsMetadataInfos.values());
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 389c8d2e94..cc80291278 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -159,7 +159,7 @@ public class TableMetadataReader {
    * @return a list of ValidDocIdsMetadataInfo
    */
   public JsonNode getAggregateValidDocIdsMetadata(String tableNameWithType, 
List<String> segmentNames,
-      String validDocIdsType, int timeoutMs)
+      String validDocIdsType, int timeoutMs, int 
numSegmentsBatchPerServerRequest)
       throws InvalidConfigException {
     final Map<String, List<String>> serverToSegments =
         _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
@@ -170,7 +170,7 @@ public class TableMetadataReader {
 
     List<ValidDocIdsMetadataInfo> aggregateTableMetadataInfo =
         
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, 
serverToSegments, endpoints,
-            segmentNames, timeoutMs, validDocIdsType);
+            segmentNames, timeoutMs, validDocIdsType, 
numSegmentsBatchPerServerRequest);
     return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 02f00046b9..9b1b89b4b7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -169,5 +169,10 @@ public class MinionConstants {
      * Valid doc ids type
      */
     public static final String VALID_DOC_IDS_TYPE = "validDocIdsType";
+
+    /**
+     * number of segments to query in one batch to fetch valid doc id 
metadata, by default 500
+     */
+    public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 
"numSegmentsBatchPerServerRequest";
   }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 0d22486e79..f63836f19b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -58,6 +58,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
   private static final String DEFAULT_BUFFER_PERIOD = "7d";
   private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0;
   private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0;
+  private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
 
   public static class SegmentSelectionResult {
 
@@ -137,15 +138,18 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
           new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
               _clusterInfoAccessor.getConnectionManager());
 
-      // TODO: currently, we put segmentNames=null to get metadata for all 
segments. We can change this to get
-      // valid doc id metadata in batches with the loop.
-
       // By default, we use 'snapshot' for validDocIdsType. This means that we 
will use the validDocIds bitmap from
       // the snapshot from Pinot segment. This will require 'enableSnapshot' 
from UpsertConfig to be set to true.
       String validDocIdsTypeStr =
           taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_TYPE, 
ValidDocIdsType.SNAPSHOT.toString());
       ValidDocIdsType validDocIdsType = 
ValidDocIdsType.valueOf(validDocIdsTypeStr.toUpperCase());
 
+      // Number of segments to query per server request. If a table has a lot 
of segments, then we might send a
+      // huge payload to pinot-server in request. Batching the requests will 
help in reducing the payload size.
+      int numSegmentsBatchPerServerRequest =
+          
Integer.parseInt(taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
+              String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));
+
       // Validate that the snapshot is enabled if validDocIdsType is 
validDocIdsSnapshot
       if (validDocIdsType == ValidDocIdsType.SNAPSHOT) {
         UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
@@ -163,7 +167,7 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
 
       List<ValidDocIdsMetadataInfo> validDocIdsMetadataList =
           
serverSegmentMetadataReader.getValidDocIdsMetadataFromServer(tableNameWithType, 
serverToSegments,
-              serverToEndpoints, null, 60_000, validDocIdsType.toString());
+              serverToEndpoints, null, 60_000, validDocIdsType.toString(), 
numSegmentsBatchPerServerRequest);
 
       Map<String, SegmentZKMetadata> completedSegmentsMap =
           
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
 Function.identity()));
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 063608235a..991f607199 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -657,9 +657,11 @@ public class TablesResource {
     }
     try {
       if (!missingSegments.isEmpty()) {
-        throw new WebApplicationException(
-            String.format("Table %s has missing segments: %s)", 
tableNameWithType, segments),
-            Response.Status.NOT_FOUND);
+        // we need not abort here or throw exception as we can still process 
the segments that are available
+        // During UpsertCompactionTaskGenerator, controller sends a lot of 
segments to server to fetch validDocIds
+        // and it may happen that a segment is deleted concurrently. In such 
cases, we should log a warning and
+        // process the remaining available segments.
+        LOGGER.warn("Table {} has missing segments {}", tableNameWithType, 
missingSegments);
       }
       List<Map<String, Object>> allValidDocIdsMetadata = new 
ArrayList<>(segmentDataManagers.size());
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to