This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2efd5cac01 return the current rebalance result if already done (#13488)
2efd5cac01 is described below

commit 2efd5cac01d5606f77c16163c0d6962fb32f63e3
Author: Johan Adami <4760722+jadam...@users.noreply.github.com>
AuthorDate: Mon Jul 8 14:49:56 2024 -0400

    return the current rebalance result if already done (#13488)
---
 .../api/resources/PinotTableRestletResource.java   | 36 +++++++++++++++++-----
 1 file changed, 29 insertions(+), 7 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index e6d2f5b49a..dbe67fe673 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
@@ -677,14 +678,31 @@ public class PinotTableRestletResource {
         if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
           // If dry-run succeeded, run rebalance asynchronously
           rebalanceConfig.setDryRun(false);
-          _executorService.submit(() -> {
+          Future<RebalanceResult> rebalanceResultFuture = 
_executorService.submit(() -> {
             try {
-              _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, true);
+              return _pinotHelixResourceManager.rebalanceTable(
+                  tableNameWithType, rebalanceConfig, rebalanceJobId, true);
             } catch (Throwable t) {
-              LOGGER.error("Caught exception/error while rebalancing table: 
{}", tableNameWithType, t);
+              String errorMsg = String.format("Caught exception/error while 
rebalancing table: %s", tableNameWithType);
+              LOGGER.error(errorMsg, t);
+              return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null);
             }
           });
-          waitForJobIdToPersist(dryRunResult.getJobId(), tableNameWithType);
+          boolean isJobIdPersisted = waitForRebalanceToPersist(
+              dryRunResult.getJobId(), tableNameWithType, 
rebalanceResultFuture);
+
+          if (rebalanceResultFuture.isDone()) {
+            try {
+              return rebalanceResultFuture.get();
+            } catch (Throwable t) {
+              if (!isJobIdPersisted) {
+                // If the jobId is not persisted, we return the exception to 
indicate the rebalance failed.
+                // Otherwise, polling the job id return NOT_FOUND indefinitely.
+                throw new ControllerApplicationException(LOGGER, 
t.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
+              }
+            }
+          }
+
           return new RebalanceResult(dryRunResult.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller logs for updates", 
dryRunResult.getInstanceAssignment(),
               dryRunResult.getTierInstanceAssignment(), 
dryRunResult.getSegmentAssignment());
@@ -699,17 +717,21 @@ public class PinotTableRestletResource {
   }
 
   /**
-   * Waits for jobId to be persisted using a retry policy.
+   * Waits for jobId to be persisted or the rebalance to complete using a 
retry policy.
    * Tables with 100k+ segments take up to a few seconds for the jobId to 
persist. This ensures the jobId is present
    * before returning the jobId to the caller, so they can correctly poll the 
jobId.
    */
-  public void waitForJobIdToPersist(String jobId, String tableNameWithType) {
+  public boolean waitForRebalanceToPersist(
+      String jobId, String tableNameWithType, Future<RebalanceResult> 
rebalanceResultFuture) {
     try {
       // This retry policy waits at most for 7.5s to 15s in total. This is 
chosen to cover typical delays for tables
       // with many segments and avoid excessive HTTP request timeouts.
-      RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() -> 
getControllerJobMetadata(jobId) != null);
+      RetryPolicies.exponentialBackoffRetryPolicy(5, 500L, 2.0).attempt(() ->
+          getControllerJobMetadata(jobId) != null || 
rebalanceResultFuture.isDone());
+      return true;
     } catch (Exception e) {
       LOGGER.warn("waiting for jobId not successful while rebalancing table: 
{}", tableNameWithType);
+      return false;
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to