This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 164cd819cf report rebalance job status for the early returns like noops (#13281) 164cd819cf is described below commit 164cd819cfaa350167ed0b285875acb0e714b79d Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Tue Jun 4 11:35:35 2024 -0700 report rebalance job status for the early returns like noops (#13281) --- .../core/rebalance/NoOpTableRebalanceObserver.java | 4 ++ .../core/rebalance/TableRebalanceObserver.java | 2 + .../helix/core/rebalance/TableRebalancer.java | 79 +++++++++++++--------- .../rebalance/ZkBasedTableRebalanceObserver.java | 12 +++- 4 files changed, 64 insertions(+), 33 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java index a435e35ead..2e5e7e2c96 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/NoOpTableRebalanceObserver.java @@ -29,6 +29,10 @@ public class NoOpTableRebalanceObserver implements TableRebalanceObserver { Map<String, Map<String, String>> targetState) { } + @Override + public void onNoop(String msg) { + } + @Override public void onSuccess(String msg) { } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java index e9c5c299cf..6139a26d65 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceObserver.java @@ -40,6 +40,8 @@ public interface TableRebalanceObserver { void onTrigger(Trigger trigger, Map<String, Map<String, String>> currentState, Map<String, Map<String, String>> targetState); + void onNoop(String msg); + void onSuccess(String msg); void onError(String errorMsg); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 5c3514fa7e..765d576549 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -194,21 +194,23 @@ public class TableRebalancer { try { currentIdealState = _helixDataAccessor.getProperty(idealStatePropertyKey); } catch (Exception e) { - LOGGER.warn( - "For rebalanceId: {}, caught exception while fetching IdealState for table: {}, aborting the rebalance", - rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while fetching IdealState for table: %s, aborting the rebalance", + rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching IdealState: " + e, null, null, null); } if (currentIdealState == null) { - LOGGER.warn("For rebalanceId: {}, cannot find the IdealState for table: {}, aborting the rebalance", - rebalanceJobId, tableNameWithType); + onReturnFailure( + String.format("For rebalanceId: %s, cannot find the IdealState for table: %s, aborting the rebalance", + rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", null, null, null); } if (!currentIdealState.isEnabled() && !downtime) { - LOGGER.warn("For rebalanceId: {}, cannot rebalance disabled table: {} without downtime, aborting the rebalance", - rebalanceJobId, tableNameWithType); + onReturnFailure(String.format( + "For rebalanceId: %s, cannot rebalance disabled table: %s without downtime, aborting the rebalance", + rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot rebalance disabled table without downtime", null, null, null); } @@ -224,8 +226,9 @@ public class TableRebalancer { instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft(); instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { - LOGGER.warn("For rebalanceId: {}, caught exception while fetching/calculating instance partitions for table: {}, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while fetching/calculating instance partitions for table: %s, " + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching/calculating instance partitions: " + e, null, null, null); } @@ -241,9 +244,9 @@ public class TableRebalancer { tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft(); tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight(); } catch (Exception e) { - LOGGER.warn( - "For rebalanceId: {}, caught exception while fetching/calculating tier instance partitions for table: {}, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while fetching/calculating tier instance partitions for table: %s, " + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching/calculating tier instance partitions: " + e, null, null, null); } @@ -258,8 +261,9 @@ public class TableRebalancer { targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); } catch (Exception e) { - LOGGER.warn("For rebalanceId: {}, caught exception while calculating target assignment for table: {}, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while calculating target assignment for table: %s, aborting the " + + "rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while calculating target assignment: " + e, instancePartitionsMap, tierToInstancePartitionsMap, null); @@ -273,6 +277,9 @@ public class TableRebalancer { if (segmentAssignmentUnchanged) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) { + _tableRebalanceObserver.onNoop( + String.format("For rebalanceId: %s, instance unchanged and table: %s is already balanced", rebalanceJobId, + tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } else { @@ -281,6 +288,9 @@ public class TableRebalancer { "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } else { + _tableRebalanceObserver.onSuccess( + String.format("For rebalanceId: %s, instance reassigned but table: %s is already balanced", + rebalanceJobId, tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -309,16 +319,19 @@ public class TableRebalancer { Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor() .set(idealStatePropertyKey.getPath(), idealStateRecord, idealStateRecord.getVersion(), AccessOption.PERSISTENT), "Failed to update IdealState"); - LOGGER.info("For rebalanceId: {}, finished rebalancing table: {} with downtime in {}ms.", rebalanceJobId, - tableNameWithType, System.currentTimeMillis() - startTimeMs); + String msg = + String.format("For rebalanceId: %s, finished rebalancing table: %s with downtime in %d ms.", rebalanceJobId, + tableNameWithType, System.currentTimeMillis() - startTimeMs); + LOGGER.info(msg); + _tableRebalanceObserver.onSuccess(msg); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not " + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); } catch (Exception e) { - LOGGER.warn( - "For rebalanceId: {}, caught exception while updating IdealState for table: {}, aborting the rebalance", - rebalanceJobId, tableNameWithType, e); + onReturnFailure(String.format( + "For rebalanceId: %s, caught exception while updating IdealState for table: %s, aborting the rebalance", + rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -347,12 +360,10 @@ public class TableRebalancer { if (minReplicasToKeepUpForNoDowntime >= 0) { // For non-negative value, use it as min available replicas if (minReplicasToKeepUpForNoDowntime >= numReplicas) { - String errorMsg = String.format( + onReturnFailure(String.format( "For rebalanceId: %s, Illegal config for minReplicasToKeepUpForNoDowntime: %d for table: %s, " + "must be less than number of replicas: %d, aborting the rebalance", rebalanceJobId, - minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas); - LOGGER.warn(errorMsg); - _tableRebalanceObserver.onError(errorMsg); + minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Illegal min available replicas config", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -446,11 +457,9 @@ public class TableRebalancer { targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers, tierToInstancePartitionsMap, rebalanceConfig); } catch (Exception e) { - String errorMsg = String.format( + onReturnFailure(String.format( "For rebalanceId: %s, caught exception while re-calculating the target assignment for table: %s, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType); - LOGGER.warn(errorMsg, e); - _tableRebalanceObserver.onError(errorMsg); + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -513,11 +522,8 @@ public class TableRebalancer { LOGGER.info("For rebalanceId: {}, version changed while updating IdealState for table: {}", rebalanceJobId, tableNameWithType); } catch (Exception e) { - String errorMsg = String.format( - "For rebalanceId: %s, caught exception while updating IdealState for table: %s, " - + "aborting the rebalance", rebalanceJobId, tableNameWithType); - LOGGER.warn(errorMsg, e); - _tableRebalanceObserver.onError(errorMsg); + onReturnFailure(String.format("For rebalanceId: %s, caught exception while updating IdealState for table: %s, " + + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment); @@ -525,6 +531,15 @@ public class TableRebalancer { } } + private void onReturnFailure(String errorMsg, Exception e) { + if (e != null) { + LOGGER.warn(errorMsg, e); + } else { + LOGGER.warn(errorMsg); + } + _tableRebalanceObserver.onError(errorMsg); + } + /** * Gets the instance partitions for instance partition types and also returns a boolean for whether they are unchanged */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 8386544f3c..b2ea9f8e69 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -121,6 +121,16 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { _tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis()); } + @Override + public void onNoop(String msg) { + _controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0); + long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000L; + _tableRebalanceProgressStats.setCompletionStatusMsg(msg); + _tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds); + _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.NO_OP); + trackStatsInZk(); + } + @Override public void onSuccess(String msg) { Preconditions.checkState(RebalanceResult.Status.DONE != _tableRebalanceProgressStats.getStatus(), @@ -140,7 +150,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { @Override public void onError(String errorMsg) { _controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0); - long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000; + long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000L; _tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds); _tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED); _tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org