This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 78cb37c189 Metrics that Tracks the Progress of Table Rebalance Jobs
(#15650)
78cb37c189 is described below
commit 78cb37c189dee21786309700efbdc068635b27af
Author: Jhow <[email protected]>
AuthorDate: Fri May 2 17:59:32 2025 -0700
Metrics that Tracks the Progress of Table Rebalance Jobs (#15650)
---
.../pinot/common/metrics/ControllerGauge.java | 6 +-
.../rebalance/ZkBasedTableRebalanceObserver.java | 42 +++++
.../TestZkBasedTableRebalanceObserver.java | 184 ++++++++++++++++++++-
3 files changed, 229 insertions(+), 3 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 51ff09387c..ddbb67a049 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -216,7 +216,11 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// Bytes to be written to deep store
DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true),
// Count of deep store segment writes that are currently in progress
- DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true);
+ DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true),
+
+ // The progress of a certain table rebalance job of a table
+ TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false);
+
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 5eaa80e91d..37da8bd9ec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -84,6 +84,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
switch (trigger) {
case START_TRIGGER:
updateOnStart(currentState, targetState, rebalanceContext);
+
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
trackStatsInZk();
updatedStatsInZk = true;
break;
@@ -104,6 +105,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
}
if
(!_tableRebalanceProgressStats.getRebalanceProgressStatsOverall().equals(latestProgress))
{
_tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress);
+ emitProgressMetric(latestProgress);
}
trackStatsInZk();
updatedStatsInZk = true;
@@ -129,6 +131,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
if
(!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress))
{
_tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(latestProgress);
}
+
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
trackStatsInZk();
updatedStatsInZk = true;
}
@@ -199,6 +202,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
new TableRebalanceProgressStats.RebalanceProgressStats();
_tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(progressStats);
trackStatsInZk();
+ emitProgressMetricDone();
}
@Override
@@ -234,6 +238,39 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
return _numUpdatesToZk;
}
+ /**
+ * Emits the rebalance progress in percent to the metrics, which is
calculated as:
+ * (totalRemainingSegmentsToBeAdded +
totalRemainingSegmentsToBeDeleted + totalRemainingSegmentsToConverge
+ * + totalCarryOverSegmentsToBeAdded +
totalCarryOverSegmentsToBeDeleted)
+ * 100% -
-------------------------------------------------------------------------------------------------
* 100%
+ * (totalSegmentsToBeAdded +
totalSegmentsToBeDeleted)
+ * Notice that for some jobs, the metrics may not be exactly accurate and
would not be 100% when the job is done.
+ * (e.g. when `lowDiskMode=false`, the job finishes without waiting for
`totalRemainingSegmentsToBeDeleted` become
+ * 0, or when `bestEffort=true` the job finishes without waiting for both
`totalRemainingSegmentsToBeAdded`,
+ * `totalRemainingSegmentsToBeDeleted`, and
`totalRemainingSegmentsToConverge` become 0)
+ * Therefore `emitProgressMetricDone()` should be called to emit the final
progress as the time job exits.
+ * @param overallProgress the latest overall progress
+ */
+ private void
emitProgressMetric(TableRebalanceProgressStats.RebalanceProgressStats
overallProgress) {
+ // Round this up so the metric is 100 only when no segment remains
+ long progressPercent = 100 - (long)
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+ overallProgress._totalSegmentsToBeAdded +
overallProgress._totalSegmentsToBeDeleted,
+ overallProgress._totalRemainingSegmentsToBeAdded +
overallProgress._totalRemainingSegmentsToBeDeleted
+ + overallProgress._totalRemainingSegmentsToConverge +
overallProgress._totalCarryOverSegmentsToBeAdded
+ + overallProgress._totalCarryOverSegmentsToBeDeleted));
+ _controllerMetrics.setValueOfTableGauge(_tableNameWithType,
ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT,
+ progressPercent < 0 ? 0 : progressPercent);
+ }
+
+ /**
+ * Emits the rebalance progress as 100 (%) to the metrics. This is to ensure
that the progress is at least aligned
+ * when the job done to avoid confusion
+ */
+ private void emitProgressMetricDone() {
+ _controllerMetrics.setValueOfTableGauge(_tableNameWithType,
ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT,
+ 100);
+ }
+
@VisibleForTesting
TableRebalanceContext getTableRebalanceContext() {
return _tableRebalanceContext;
@@ -297,6 +334,11 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
return jobMetadata;
}
+ @VisibleForTesting
+ TableRebalanceProgressStats getTableRebalanceProgressStats() {
+ return _tableRebalanceProgressStats;
+ }
+
/**
* Takes in targetState and sourceState and computes stats based on the
comparison between sourceState and
* targetState.This captures how far the source state is from the target
state. Example - if there are 4 segments and
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 7ae66a7b30..043269f037 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -24,10 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
@@ -38,10 +38,170 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class TestZkBasedTableRebalanceObserver {
+ @Test
+ void testZkObserverProgressStats() {
+ PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ // Mocking this. We will verify using numZkUpdate stat
+ when(pinotHelixResourceManager.addControllerJobToZK(any(), any(),
any())).thenReturn(true);
+ ControllerMetrics controllerMetrics = ControllerMetrics.get();
+ TableRebalanceContext retryCtx = new TableRebalanceContext();
+ retryCtx.setConfig(new RebalanceConfig());
+ ZkBasedTableRebalanceObserver observer =
+ new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
pinotHelixResourceManager);
+ Map<String, Map<String, String>> source = new TreeMap<>();
+ Map<String, Map<String, String>> target = new TreeMap<>();
+ Map<String, Map<String, String>> targetIntermediate = new TreeMap<>();
+ Map<String, Map<String, String>> sourceIntermediate = new TreeMap<>();
+ source.put("segment1",
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1"), ONLINE));
+ source.put("segment2",
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE));
+ target.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1",
"host2", "host3"), ONLINE));
+ target.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4",
"host5", "host6"), ONLINE));
+ targetIntermediate.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1",
"host2"), ONLINE));
+ targetIntermediate.put("segment2",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4",
"host5"), ONLINE));
+
+ sourceIntermediate.put("segment1",
+ SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1",
"host2"), ONLINE));
+ sourceIntermediate.put("segment2",
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE));
+
+ Set<String> segmentSet = new HashSet<>(source.keySet());
+ segmentSet.addAll(target.keySet());
+ TableRebalanceObserver.RebalanceContext rebalanceContext =
+ new TableRebalanceObserver.RebalanceContext(-1, segmentSet,
segmentSet);
+ // START_TRIGGER will set up the ZK progress stats to have the diff
between source and target. When calling the
+ // triggers for IS and EV-IS, since source and source are compared, the
diff will change for the IS trigger
+ // but not for the EV-IS trigger, so ZK must be updated 1 extra time
+ observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source,
target, rebalanceContext);
+ TableRebalanceProgressStats.RebalanceProgressStats overallStats =
+
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ TableRebalanceProgressStats.RebalanceProgressStats currentStepStats =
+
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
source, target, rebalanceContext);
+ overallStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ currentStepStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+ checkProgressPercentMetrics(controllerMetrics, observer);
+ // This simulates the first step of rebalance, where the IS is set to the
intermediate assignment
+
observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
source, targetIntermediate,
+ rebalanceContext);
+ overallStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ currentStepStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+ sourceIntermediate, targetIntermediate, rebalanceContext);
+ overallStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ currentStepStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 3);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 1);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+ checkProgressPercentMetrics(controllerMetrics, observer);
+
+ // Assume bestEfforts=true and we didn't wait for the second segment to
converge before moving to next step
+ // Here the currentAssignment is based on the IS and not the EV. IS is
fully updated to the targetIntermediate
+
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
targetIntermediate, target,
+ rebalanceContext);
+ overallStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ currentStepStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 2);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 1);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+ checkProgressPercentMetrics(controllerMetrics, observer);
+ // Next assignment calculated based on the IS, IS should be same as the
previous targetAssignment
+
observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
targetIntermediate, target,
+ rebalanceContext);
+ overallStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ currentStepStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 2);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+ checkProgressPercentMetrics(controllerMetrics, observer);
+
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+ sourceIntermediate, target, rebalanceContext);
+ overallStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ currentStepStats =
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+ assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+ assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 2);
+ assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 1);
+ assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 2);
+ assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 1);
+ assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+ checkProgressPercentMetrics(controllerMetrics, observer);
+ }
// This is a test to verify if Zk stats are pushed out correctly
@Test
@@ -49,7 +209,7 @@ public class TestZkBasedTableRebalanceObserver {
PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
// Mocking this. We will verify using numZkUpdate stat
when(pinotHelixResourceManager.addControllerJobToZK(any(), any(),
any())).thenReturn(true);
- ControllerMetrics controllerMetrics =
Mockito.mock(ControllerMetrics.class);
+ ControllerMetrics controllerMetrics = ControllerMetrics.get();
TableRebalanceContext retryCtx = new TableRebalanceContext();
retryCtx.setConfig(new RebalanceConfig());
ZkBasedTableRebalanceObserver observer =
@@ -67,16 +227,21 @@ public class TestZkBasedTableRebalanceObserver {
segmentSet, segmentSet);
observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source,
target, rebalanceContext);
assertEquals(observer.getNumUpdatesToZk(), 1);
+ checkProgressPercentMetrics(controllerMetrics, observer);
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
source, source, rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
source, source,
rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
// START_TRIGGER will set up the ZK progress stats to have the diff
between source and target. When calling the
// triggers for IS and EV-IS, since source and source are compared, the
diff will change for the IS trigger
// but not for the EV-IS trigger, so ZK must be updated 1 extra time
assertEquals(observer.getNumUpdatesToZk(), 2);
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
source, target, rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
source, target,
rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
// Both of the changes above will update ZK for progress stats
assertEquals(observer.getNumUpdatesToZk(), 4);
// Try a rollback and this should trigger a ZK update as well
@@ -84,6 +249,21 @@ public class TestZkBasedTableRebalanceObserver {
assertEquals(observer.getNumUpdatesToZk(), 5);
}
+ private void checkProgressPercentMetrics(ControllerMetrics controllerMetrics,
+ ZkBasedTableRebalanceObserver observer) {
+ Long progressGaugeValue =
+
controllerMetrics.getGaugeValue(ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT.getGaugeName()
+ ".dummy");
+ assertNotNull(progressGaugeValue);
+ TableRebalanceProgressStats.RebalanceProgressStats overallProgress =
+
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ long progressRemained = (long)
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+ overallProgress._totalSegmentsToBeAdded +
overallProgress._totalSegmentsToBeDeleted,
+ overallProgress._totalRemainingSegmentsToBeAdded +
overallProgress._totalRemainingSegmentsToBeDeleted
+ + overallProgress._totalRemainingSegmentsToConverge +
overallProgress._totalCarryOverSegmentsToBeAdded
+ + overallProgress._totalCarryOverSegmentsToBeDeleted));
+ assertEquals(progressGaugeValue, progressRemained > 100 ? 0 : 100 -
progressRemained);
+ }
+
@Test
void testDifferenceBetweenTableRebalanceStates() {
Map<String, Map<String, String>> target = new TreeMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]