This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 164cd819cf report rebalance job status for the early returns like 
noops (#13281)
164cd819cf is described below

commit 164cd819cfaa350167ed0b285875acb0e714b79d
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Tue Jun 4 11:35:35 2024 -0700

    report rebalance job status for the early returns like noops (#13281)
---
 .../core/rebalance/NoOpTableRebalanceObserver.java |  4 ++
 .../core/rebalance/TableRebalanceObserver.java     |  2 +
 .../helix/core/rebalance/TableRebalancer.java      | 79 +++++++++++++---------
 .../rebalance/ZkBasedTableRebalanceObserver.java   | 12 +++-
 4 files changed, 64 insertions(+), 33 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
index a435e35ead..2e5e7e2c96 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java
@@ -29,6 +29,10 @@ public class NoOpTableRebalanceObserver implements 
TableRebalanceObserver {
       Map<String, Map<String, String>> targetState) {
   }
 
+  @Override
+  public void onNoop(String msg) {
+  }
+
   @Override
   public void onSuccess(String msg) {
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
index e9c5c299cf..6139a26d65 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java
@@ -40,6 +40,8 @@ public interface TableRebalanceObserver {
   void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
       Map<String, Map<String, String>> targetState);
 
+  void onNoop(String msg);
+
   void onSuccess(String msg);
 
   void onError(String errorMsg);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 5c3514fa7e..765d576549 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -194,21 +194,23 @@ public class TableRebalancer {
     try {
       currentIdealState = 
_helixDataAccessor.getProperty(idealStatePropertyKey);
     } catch (Exception e) {
-      LOGGER.warn(
-          "For rebalanceId: {}, caught exception while fetching IdealState for 
table: {}, aborting the rebalance",
-          rebalanceJobId, tableNameWithType, e);
+      onReturnFailure(String.format(
+          "For rebalanceId: %s, caught exception while fetching IdealState for 
table: %s, aborting the rebalance",
+          rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while fetching IdealState: " + e, null, null, 
null);
     }
     if (currentIdealState == null) {
-      LOGGER.warn("For rebalanceId: {}, cannot find the IdealState for table: 
{}, aborting the rebalance",
-          rebalanceJobId, tableNameWithType);
+      onReturnFailure(
+          String.format("For rebalanceId: %s, cannot find the IdealState for 
table: %s, aborting the rebalance",
+              rebalanceJobId, tableNameWithType), null);
       return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
           null, null, null);
     }
     if (!currentIdealState.isEnabled() && !downtime) {
-      LOGGER.warn("For rebalanceId: {}, cannot rebalance disabled table: {} 
without downtime, aborting the rebalance",
-          rebalanceJobId, tableNameWithType);
+      onReturnFailure(String.format(
+          "For rebalanceId: %s, cannot rebalance disabled table: %s without 
downtime, aborting the rebalance",
+          rebalanceJobId, tableNameWithType), null);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Cannot rebalance disabled table without downtime", null, null, 
null);
     }
@@ -224,8 +226,9 @@ public class TableRebalancer {
       instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
       instancePartitionsUnchanged = 
instancePartitionsMapAndUnchanged.getRight();
     } catch (Exception e) {
-      LOGGER.warn("For rebalanceId: {}, caught exception while 
fetching/calculating instance partitions for table: {}, "
-          + "aborting the rebalance", rebalanceJobId, tableNameWithType, e);
+      onReturnFailure(String.format(
+          "For rebalanceId: %s, caught exception while fetching/calculating 
instance partitions for table: %s, "
+              + "aborting the rebalance", rebalanceJobId, tableNameWithType), 
e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null, null);
     }
@@ -241,9 +244,9 @@ public class TableRebalancer {
       tierToInstancePartitionsMap = 
tierToInstancePartitionsMapAndUnchanged.getLeft();
       tierInstancePartitionsUnchanged = 
tierToInstancePartitionsMapAndUnchanged.getRight();
     } catch (Exception e) {
-      LOGGER.warn(
-          "For rebalanceId: {}, caught exception while fetching/calculating 
tier instance partitions for table: {}, "
-              + "aborting the rebalance", rebalanceJobId, tableNameWithType, 
e);
+      onReturnFailure(String.format(
+          "For rebalanceId: %s, caught exception while fetching/calculating 
tier instance partitions for table: %s, "
+              + "aborting the rebalance", rebalanceJobId, tableNameWithType), 
e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while fetching/calculating tier instance 
partitions: " + e, null, null, null);
     }
@@ -258,8 +261,9 @@ public class TableRebalancer {
       targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, 
instancePartitionsMap, sortedTiers,
           tierToInstancePartitionsMap, rebalanceConfig);
     } catch (Exception e) {
-      LOGGER.warn("For rebalanceId: {}, caught exception while calculating 
target assignment for table: {}, "
-          + "aborting the rebalance", rebalanceJobId, tableNameWithType, e);
+      onReturnFailure(String.format(
+          "For rebalanceId: %s, caught exception while calculating target 
assignment for table: %s, aborting the "
+              + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
           tierToInstancePartitionsMap, null);
@@ -273,6 +277,9 @@ public class TableRebalancer {
     if (segmentAssignmentUnchanged) {
       LOGGER.info("Table: {} is already balanced", tableNameWithType);
       if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
+        _tableRebalanceObserver.onNoop(
+            String.format("For rebalanceId: %s, instance unchanged and table: 
%s is already balanced", rebalanceJobId,
+                tableNameWithType));
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.NO_OP, "Table is already balanced",
             instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment);
       } else {
@@ -281,6 +288,9 @@ public class TableRebalancer {
               "Instance reassigned in dry-run mode, table is already 
balanced", instancePartitionsMap,
               tierToInstancePartitionsMap, targetAssignment);
         } else {
+          _tableRebalanceObserver.onSuccess(
+              String.format("For rebalanceId: %s, instance reassigned but 
table: %s is already balanced",
+                  rebalanceJobId, tableNameWithType));
           return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.DONE,
               "Instance reassigned, table is already balanced", 
instancePartitionsMap, tierToInstancePartitionsMap,
               targetAssignment);
@@ -309,16 +319,19 @@ public class TableRebalancer {
         Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
             .set(idealStatePropertyKey.getPath(), idealStateRecord, 
idealStateRecord.getVersion(),
                 AccessOption.PERSISTENT), "Failed to update IdealState");
-        LOGGER.info("For rebalanceId: {}, finished rebalancing table: {} with 
downtime in {}ms.", rebalanceJobId,
-            tableNameWithType, System.currentTimeMillis() - startTimeMs);
+        String msg =
+            String.format("For rebalanceId: %s, finished rebalancing table: %s 
with downtime in %d ms.", rebalanceJobId,
+                tableNameWithType, System.currentTimeMillis() - startTimeMs);
+        LOGGER.info(msg);
+        _tableRebalanceObserver.onSuccess(msg);
         return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
             "Success with downtime (replaced IdealState with the target 
segment assignment, ExternalView might not "
                 + "reach the target segment assignment yet)", 
instancePartitionsMap, tierToInstancePartitionsMap,
             targetAssignment);
       } catch (Exception e) {
-        LOGGER.warn(
-            "For rebalanceId: {}, caught exception while updating IdealState 
for table: {}, aborting the rebalance",
-            rebalanceJobId, tableNameWithType, e);
+        onReturnFailure(String.format(
+            "For rebalanceId: %s, caught exception while updating IdealState 
for table: %s, aborting the rebalance",
+            rebalanceJobId, tableNameWithType), e);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
             targetAssignment);
@@ -347,12 +360,10 @@ public class TableRebalancer {
     if (minReplicasToKeepUpForNoDowntime >= 0) {
       // For non-negative value, use it as min available replicas
       if (minReplicasToKeepUpForNoDowntime >= numReplicas) {
-        String errorMsg = String.format(
+        onReturnFailure(String.format(
             "For rebalanceId: %s, Illegal config for 
minReplicasToKeepUpForNoDowntime: %d for table: %s, "
                 + "must be less than number of replicas: %d, aborting the 
rebalance", rebalanceJobId,
-            minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas);
-        LOGGER.warn(errorMsg);
-        _tableRebalanceObserver.onError(errorMsg);
+            minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), 
null);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Illegal min available replicas config", instancePartitionsMap, 
tierToInstancePartitionsMap,
             targetAssignment);
@@ -446,11 +457,9 @@ public class TableRebalancer {
             targetAssignment = 
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, 
sortedTiers,
                 tierToInstancePartitionsMap, rebalanceConfig);
           } catch (Exception e) {
-            String errorMsg = String.format(
+            onReturnFailure(String.format(
                 "For rebalanceId: %s, caught exception while re-calculating 
the target assignment for table: %s, "
-                    + "aborting the rebalance", rebalanceJobId, 
tableNameWithType);
-            LOGGER.warn(errorMsg, e);
-            _tableRebalanceObserver.onError(errorMsg);
+                    + "aborting the rebalance", rebalanceJobId, 
tableNameWithType), e);
             return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
                 "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
                 tierToInstancePartitionsMap, targetAssignment);
@@ -513,11 +522,8 @@ public class TableRebalancer {
         LOGGER.info("For rebalanceId: {}, version changed while updating 
IdealState for table: {}", rebalanceJobId,
             tableNameWithType);
       } catch (Exception e) {
-        String errorMsg = String.format(
-            "For rebalanceId: %s, caught exception while updating IdealState 
for table: %s, "
-                + "aborting the rebalance", rebalanceJobId, tableNameWithType);
-        LOGGER.warn(errorMsg, e);
-        _tableRebalanceObserver.onError(errorMsg);
+        onReturnFailure(String.format("For rebalanceId: %s, caught exception 
while updating IdealState for table: %s, "
+            + "aborting the rebalance", rebalanceJobId, tableNameWithType), e);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
             targetAssignment);
@@ -525,6 +531,15 @@ public class TableRebalancer {
     }
   }
 
+  private void onReturnFailure(String errorMsg, Exception e) {
+    if (e != null) {
+      LOGGER.warn(errorMsg, e);
+    } else {
+      LOGGER.warn(errorMsg);
+    }
+    _tableRebalanceObserver.onError(errorMsg);
+  }
+
   /**
    * Gets the instance partitions for instance partition types and also 
returns a boolean for whether they are unchanged
    */
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 8386544f3c..b2ea9f8e69 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -121,6 +121,16 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     _tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
   }
 
+  @Override
+  public void onNoop(String msg) {
+    _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0);
+    long timeToFinishInSeconds = (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
+    _tableRebalanceProgressStats.setCompletionStatusMsg(msg);
+    
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
+    _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.NO_OP);
+    trackStatsInZk();
+  }
+
   @Override
   public void onSuccess(String msg) {
     Preconditions.checkState(RebalanceResult.Status.DONE != 
_tableRebalanceProgressStats.getStatus(),
@@ -140,7 +150,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
   @Override
   public void onError(String errorMsg) {
     _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0);
-    long timeToFinishInSeconds = (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+    long timeToFinishInSeconds = (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
     
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
     _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
     _tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to