This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 577830cbe0 allow to set targetInstance for reloadSegment (#14393)
577830cbe0 is described below

commit 577830cbe0a185219e3a2c3c5d22ac4d038877b1
Author: Xiaobing <[email protected]>
AuthorDate: Tue Nov 5 18:03:25 2024 -0800

    allow to set targetInstance for reloadSegment (#14393)
---
 .../api/resources/PinotSegmentRestletResource.java | 62 +++++++++++++---------
 .../helix/core/PinotHelixResourceManager.java      | 19 ++++---
 2 files changed, 47 insertions(+), 34 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2f0d565590..9422fc64a6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -296,8 +296,7 @@ public class PinotSegmentRestletResource {
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER,
           String.format("Exception while listing segment lineage: %s for 
table: %s.", e.getMessage(),
-              tableNameWithType),
-          Status.INTERNAL_SERVER_ERROR, e);
+              tableNameWithType), Status.INTERNAL_SERVER_ERROR, e);
     }
   }
 
@@ -360,8 +359,8 @@ public class PinotSegmentRestletResource {
 
   private JsonNode getExtraMetaData(String tableName, String segmentName, 
List<String> columns) {
     try {
-      TableMetadataReader tableMetadataReader = new 
TableMetadataReader(_executor,
-          _connectionManager, _pinotHelixResourceManager);
+      TableMetadataReader tableMetadataReader =
+          new TableMetadataReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
       return tableMetadataReader.getSegmentMetadata(tableName, segmentName, 
columns,
           _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
     } catch (InvalidConfigException e) {
@@ -390,19 +389,21 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
       @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
-      @DefaultValue("false") boolean forceDownload, @Context HttpHeaders 
headers) {
+      @DefaultValue("false") boolean forceDownload,
+      @ApiParam(value = "Name of the target instance to reload") 
@QueryParam("targetInstance") @Nullable
+      String targetInstance, @Context HttpHeaders headers) {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     long startTimeMs = System.currentTimeMillis();
     segmentName = URIUtils.decode(segmentName);
     String tableNameWithType = getExistingTable(tableName, segmentName);
     Pair<Integer, String> msgInfo =
-        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload);
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload, targetInstance);
     boolean zkJobMetaWriteSuccess = false;
-    if (msgInfo.getLeft() > 0) {
+    int numReloadMsgSent = msgInfo.getLeft();
+    if (numReloadMsgSent > 0) {
       try {
         if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, msgInfo.getRight(),
-            startTimeMs,
-            msgInfo.getLeft())) {
+            startTimeMs, numReloadMsgSent)) {
           zkJobMetaWriteSuccess = true;
         } else {
           LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table: {}, segment: {}",
@@ -414,11 +415,11 @@ public class PinotSegmentRestletResource {
       }
       return new SuccessResponse(
           String.format("Submitted reload job id: %s, sent %d reload messages. 
Job meta ZK storage status: %s",
-              msgInfo.getRight(), msgInfo.getLeft(), zkJobMetaWriteSuccess ? 
"SUCCESS" : "FAILED"));
-    } else {
-      throw new ControllerApplicationException(LOGGER,
-          "Failed to find segment: " + segmentName + " in table: " + 
tableName, Status.NOT_FOUND);
+              msgInfo.getRight(), numReloadMsgSent, zkJobMetaWriteSuccess ? 
"SUCCESS" : "FAILED"));
     }
+    throw new ControllerApplicationException(LOGGER,
+        String.format("Failed to find segment: %s in table: %s on %s", 
segmentName, tableName,
+            targetInstance == null ? "every instance" : targetInstance), 
Status.NOT_FOUND);
   }
 
   /**
@@ -522,15 +523,14 @@ public class PinotSegmentRestletResource {
   public ServerReloadControllerJobStatusResponse getReloadJobStatus(
       @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") 
String reloadJobId)
       throws Exception {
-    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.
-            getControllerJobZKMetadata(reloadJobId, 
ControllerJobType.RELOAD_SEGMENT);
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId, 
ControllerJobType.RELOAD_SEGMENT);
     if (controllerJobZKMetadata == null) {
       throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + reloadJobId,
           Status.NOT_FOUND);
     }
 
-    String tableNameWithType =
-        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
     Map<String, List<String>> serverToSegments;
 
     String singleSegmentName =
@@ -571,7 +571,7 @@ public class PinotSegmentRestletResource {
     serverReloadControllerJobStatusResponse.setSuccessCount(0);
 
     int totalSegments = 0;
-    for (Map.Entry<String, List<String>> entry: serverToSegments.entrySet()) {
+    for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
       totalSegments += entry.getValue().size();
     }
     
serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
@@ -587,8 +587,7 @@ public class PinotSegmentRestletResource {
             serverReloadControllerJobStatusResponse.getSuccessCount() + 
response.getSuccessCount());
       } catch (Exception e) {
         serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
-        );
+            
serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1);
       }
     }
 
@@ -596,8 +595,7 @@ public class PinotSegmentRestletResource {
     
serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
 
     // Add derived fields
-    long submissionTime =
-        
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+    long submissionTime = 
Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
     double timeElapsedInMinutes = ((double) System.currentTimeMillis() - 
(double) submissionTime) / (1000.0 * 60.0);
     int remainingSegments = 
serverReloadControllerJobStatusResponse.getTotalSegmentCount()
         - serverReloadControllerJobStatusResponse.getSuccessCount();
@@ -625,7 +623,9 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
       @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
-      @DefaultValue("false") boolean forceDownload, @Context HttpHeaders 
headers)
+      @DefaultValue("false") boolean forceDownload,
+      @ApiParam(value = "Name of the target instance to reload") 
@QueryParam("targetInstance") @Nullable
+      String targetInstance, @Context HttpHeaders headers)
       throws JsonProcessingException {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
     long startTimeMs = System.currentTimeMillis();
@@ -644,15 +644,20 @@ public class PinotSegmentRestletResource {
             LOGGER);
     Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      Pair<Integer, String> msgInfo = 
_pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      Pair<Integer, String> msgInfo =
+          _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, 
forceDownload, targetInstance);
+      int numReloadMsgSent = msgInfo.getLeft();
+      if (numReloadMsgSent <= 0) {
+        continue;
+      }
       Map<String, String> tableReloadMeta = new HashMap<>();
-      tableReloadMeta.put("numMessagesSent", 
String.valueOf(msgInfo.getLeft()));
+      tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent));
       tableReloadMeta.put("reloadJobId", msgInfo.getRight());
       perTableMsgData.put(tableNameWithType, tableReloadMeta);
       // Store in ZK
       try {
         if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
msgInfo.getRight(), startTimeMs,
-            msgInfo.getLeft())) {
+            numReloadMsgSent)) {
           tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
         } else {
           tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
@@ -663,6 +668,11 @@ public class PinotSegmentRestletResource {
         LOGGER.error("Failed to add reload all segments job meta into 
zookeeper for table: {}", tableNameWithType, e);
       }
     }
+    if (perTableMsgData.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to find any segments in table: %s on %s", 
tableName,
+              targetInstance == null ? "every instance" : targetInstance), 
Status.NOT_FOUND);
+    }
     return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 048152fefe..099cf4b5e8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1520,7 +1520,7 @@ public class PinotHelixResourceManager {
       LOGGER.info("Reloading tables with name: {}", schemaName);
       List<String> tableNamesWithType = 
getExistingTableNamesWithType(schemaName, null);
       for (String tableNameWithType : tableNamesWithType) {
-        reloadAllSegments(tableNameWithType, false);
+        reloadAllSegments(tableNameWithType, false, null);
       }
     }
   }
@@ -2605,8 +2605,10 @@ public class PinotHelixResourceManager {
     sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
   }
 
-  public Pair<Integer, String> reloadAllSegments(String tableNameWithType, 
boolean forceDownload) {
-    LOGGER.info("Sending reload message for table: {} with forceDownload: {}", 
tableNameWithType, forceDownload);
+  public Pair<Integer, String> reloadAllSegments(String tableNameWithType, 
boolean forceDownload,
+      @Nullable String targetInstance) {
+    LOGGER.info("Sending reload message for table: {} with forceDownload: {}, 
and target: {}", tableNameWithType,
+        forceDownload, targetInstance == null ? "every instance" : 
targetInstance);
 
     if (forceDownload) {
       TableType tt = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -2617,7 +2619,7 @@ public class PinotHelixResourceManager {
 
     Criteria recipientCriteria = new Criteria();
     recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
     recipientCriteria.setResource(tableNameWithType);
     recipientCriteria.setSessionSpecific(true);
     SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, forceDownload);
@@ -2635,9 +2637,10 @@ public class PinotHelixResourceManager {
     return Pair.of(numMessagesSent, segmentReloadMessage.getMsgId());
   }
 
-  public Pair<Integer, String> reloadSegment(String tableNameWithType, String 
segmentName, boolean forceDownload) {
-    LOGGER.info("Sending reload message for segment: {} in table: {} with 
forceDownload: {}", segmentName,
-        tableNameWithType, forceDownload);
+  public Pair<Integer, String> reloadSegment(String tableNameWithType, String 
segmentName, boolean forceDownload,
+      @Nullable String targetInstance) {
+    LOGGER.info("Sending reload message for segment: {} in table: {} with 
forceDownload: {}, and target: {}",
+        segmentName, tableNameWithType, forceDownload, targetInstance == null 
? "every instance" : targetInstance);
 
     if (forceDownload) {
       TableType tt = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -2649,7 +2652,7 @@ public class PinotHelixResourceManager {
 
     Criteria recipientCriteria = new Criteria();
     recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setInstanceName(targetInstance == null ? "%" : 
targetInstance);
     recipientCriteria.setResource(tableNameWithType);
     recipientCriteria.setPartition(segmentName);
     recipientCriteria.setSessionSpecific(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to