This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee441e18d Metrics that Tracks the Progress of each Table Rebalance
Job (#15518)
4ee441e18d is described below
commit 4ee441e18db02d94fcc5d4a2affddaf20d825290
Author: Jhow <[email protected]>
AuthorDate: Fri Apr 25 14:49:20 2025 -0700
Metrics that Tracks the Progress of each Table Rebalance Job (#15518)
---
.../configs/controller.yml | 9 +++++
.../pinot/common/metrics/ControllerGauge.java | 6 +++-
.../ControllerPrometheusMetricsTest.java | 5 +++
.../prometheus/PinotPrometheusMetricsTest.java | 6 ++++
.../rebalance/ZkBasedTableRebalanceObserver.java | 40 ++++++++++++++++++++++
.../TestZkBasedTableRebalanceObserver.java | 27 +++++++++++++--
6 files changed, 89 insertions(+), 4 deletions(-)
diff --git
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index 4b06ad572e..5023cc639d 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -48,6 +48,15 @@ rules:
table: "$2$4"
tableType: "$5"
taskType: "$6"
+# Gauge for rebalanceJobId and tableNameWithType
+- pattern:
"\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\",
name=\"pinot\\.controller\\.tableRebalanceJobProgressPercent\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.([0-9a-fA-F-]+)\"><>(\\w+)"
+ name: "pinot_controller_tableRebalanceJobProgressPercent_$6"
+ cache: true
+ labels:
+ database: "$2"
+ table: "$1$3"
+ tableType: "$4"
+ jobId: "$5"
# Special handling for timers like cronSchedulerJobExecutionTimeMs and
tableRebalanceExecutionTimeMs which use table name, table type and another
string for status / taskType
- pattern:
"\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\",
name=\"pinot\\.controller\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.(\\w+)\\.cronSchedulerJobExecutionTimeMs\"><>(\\w+)"
name: "pinot_controller_cronSchedulerJobExecutionTimeMs_$6"
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 51ff09387c..ddbb67a049 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -216,7 +216,11 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// Bytes to be written to deep store
DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true),
// Count of deep store segment writes that are currently in progress
- DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true);
+ DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true),
+
+ // The progress of a certain table rebalance job of a table
+ TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false);
+
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
index 45583184d4..0cf11bc020 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
@@ -178,6 +178,11 @@ public abstract class ControllerPrometheusMetricsTest
extends PinotPrometheusMet
String.format("%s.%s",
ExportedLabelValues.CONTROLLER_PERIODIC_TASK_CHC, TaskState.IN_PROGRESS));
assertGaugeExportedCorrectly(ControllerGauge.TASK_STATUS.getGaugeName(),
ExportedLabels.JOBSTATUS_CONTROLLER_TASKTYPE,
EXPORTED_METRIC_PREFIX);
+ } else if (controllerGauge ==
ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT) {
+ addGaugeWithLabels(controllerGauge,
+ String.format("%s.%s", TABLE_NAME_WITH_TYPE, REBALANCE_JOB_ID));
+ assertGaugeExportedCorrectly(controllerGauge.getGaugeName(),
+ ExportedLabels.JOBID_TABLENAME_TABLETYPE, EXPORTED_METRIC_PREFIX);
} else {
addGaugeWithLabels(controllerGauge, TABLE_NAME_WITH_TYPE);
assertGaugeExportedCorrectly(controllerGauge.getGaugeName(),
ExportedLabels.TABLENAME_TABLETYPE,
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
index 5173bc00df..f7d0d7462e 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -68,6 +69,7 @@ public abstract class PinotPrometheusMetricsTest {
protected static final String PARTITION_GROUP_ID = "partitionGroupId";
protected static final String CLIENT_ID =
String.format("%s-%s-%s", TABLE_NAME_WITH_TYPE, KAFKA_TOPIC,
PARTITION_GROUP_ID);
+ protected static final String REBALANCE_JOB_ID =
UUID.randomUUID().toString();
protected HttpClient _httpClient;
@@ -336,6 +338,9 @@ public abstract class PinotPrometheusMetricsTest {
public static final List<String> TASKTYPE_TABLENAME_TABLETYPE =
List.of(TASKTYPE, ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT,
TABLE, ExportedLabelValues.TABLENAME,
TABLETYPE, TABLETYPE_REALTIME);
+
+ public static final List<String> JOBID_TABLENAME_TABLETYPE =
+ List.of(JOBID, REBALANCE_JOB_ID, TABLE, ExportedLabelValues.TABLENAME,
TABLETYPE, TABLETYPE_REALTIME);
}
public static class ExportedLabelKeys {
@@ -348,6 +353,7 @@ public abstract class PinotPrometheusMetricsTest {
public static final String PERIODIC_TASK = "periodicTask";
public static final String STATUS = "status";
public static final String DATABASE = "database";
+ public static final String JOBID = "jobId";
}
public static class ExportedLabelValues {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 125b0568bc..6b12a9a225 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -84,6 +84,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
switch (trigger) {
case START_TRIGGER:
updateOnStart(currentState, targetState, rebalanceContext);
+
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
trackStatsInZk();
updatedStatsInZk = true;
break;
@@ -104,6 +105,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
}
if
(!_tableRebalanceProgressStats.getRebalanceProgressStatsOverall().equals(latestProgress))
{
_tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress);
+ emitProgressMetric(latestProgress);
}
trackStatsInZk();
updatedStatsInZk = true;
@@ -129,6 +131,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
if
(!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress))
{
_tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(latestProgress);
}
+
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
trackStatsInZk();
updatedStatsInZk = true;
}
@@ -199,6 +202,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
new TableRebalanceProgressStats.RebalanceProgressStats();
_tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(progressStats);
trackStatsInZk();
+ emitProgressMetricDone();
}
@Override
@@ -234,6 +238,37 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
return _numUpdatesToZk;
}
+ /**
+ * Emits the rebalance progress in percent to the metrics. Uses the
percentage of remaining segments to be added as
+ * the indicator of the overall progress.
+ * Notice that for some jobs, the metrics may not be exactly accurate and
would not be 100% when the job is done.
+ * (e.g. when `lowDiskMode=false`, the job finishes without waiting for
`totalRemainingSegmentsToBeDeleted` become
+ * 0, or when `bestEffort=true` the job finishes without waiting for both
`totalRemainingSegmentsToBeAdded`,
+ * `totalRemainingSegmentsToBeDeleted`, and
`totalRemainingSegmentsToConverge` become 0)
+ * Therefore `emitProgressMetricDone()` should be called to emit the final
progress as the time job exits.
+ * @param overallProgress the latest overall progress
+ */
+ private void
emitProgressMetric(TableRebalanceProgressStats.RebalanceProgressStats
overallProgress) {
+ // Round this up so the metric is 100 only when no segment remains
+ long progressPercent = 100 - (long)
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+ overallProgress._totalSegmentsToBeAdded +
overallProgress._totalSegmentsToBeDeleted,
+ overallProgress._totalRemainingSegmentsToBeAdded +
overallProgress._totalRemainingSegmentsToBeDeleted
+ + overallProgress._totalRemainingSegmentsToConverge));
+ // Using the original job ID to group rebalance retries together with the
same label
+ _controllerMetrics.setValueOfTableGauge(_tableNameWithType + "." +
_tableRebalanceContext.getOriginalJobId(),
+ ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT,
+ progressPercent < 0 ? 0 : progressPercent);
+ }
+
+ /**
+ * Emits the rebalance progress as 100 (%) to the metrics. This is to ensure
that the progress is at least aligned
+ * when the job done to avoid confusion
+ */
+ private void emitProgressMetricDone() {
+ _controllerMetrics.setValueOfTableGauge(_tableNameWithType + "." +
_tableRebalanceContext.getOriginalJobId(),
+ ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT, 100);
+ }
+
@VisibleForTesting
TableRebalanceContext getTableRebalanceContext() {
return _tableRebalanceContext;
@@ -297,6 +332,11 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
return jobMetadata;
}
+ @VisibleForTesting
+ TableRebalanceProgressStats getTableRebalanceProgressStats() {
+ return _tableRebalanceProgressStats;
+ }
+
/**
* Takes in targetState and sourceState and computes stats based on the
comparison between sourceState and
* targetState.This captures how far the source state is from the target
state. Example - if there are 4 segments and
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 0d2f395d79..e80530c082 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -24,10 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
@@ -38,6 +38,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -49,11 +50,12 @@ public class TestZkBasedTableRebalanceObserver {
PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
// Mocking this. We will verify using numZkUpdate stat
when(pinotHelixResourceManager.addControllerJobToZK(any(), any(),
any())).thenReturn(true);
- ControllerMetrics controllerMetrics =
Mockito.mock(ControllerMetrics.class);
+ ControllerMetrics controllerMetrics = ControllerMetrics.get();
TableRebalanceContext retryCtx = new TableRebalanceContext();
retryCtx.setConfig(new RebalanceConfig());
+ retryCtx.setOriginalJobId("testZkObserverTracking");
ZkBasedTableRebalanceObserver observer =
- new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
pinotHelixResourceManager);
+ new ZkBasedTableRebalanceObserver("dummy", "testZkObserverTracking",
retryCtx, pinotHelixResourceManager);
Map<String, Map<String, String>> source = new TreeMap<>();
Map<String, Map<String, String>> target = new TreeMap<>();
target.put("segment1",
@@ -67,16 +69,21 @@ public class TestZkBasedTableRebalanceObserver {
segmentSet, segmentSet);
observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source,
target, rebalanceContext);
assertEquals(observer.getNumUpdatesToZk(), 1);
+ checkProgressPercentMetrics(controllerMetrics, observer);
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
source, source, rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
source, source,
rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
// START_TRIGGER will set up the ZK progress stats to have the diff
between source and target. When calling the
// triggers for IS and EV-IS, since source and source are compared, the
diff will change for the IS trigger
// but not for the EV-IS trigger, so ZK must be updated 1 extra time
assertEquals(observer.getNumUpdatesToZk(), 2);
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER,
source, target, rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
source, target,
rebalanceContext);
+ checkProgressPercentMetrics(controllerMetrics, observer);
// Both of the changes above will update ZK for progress stats
assertEquals(observer.getNumUpdatesToZk(), 4);
// Try a rollback and this should trigger a ZK update as well
@@ -84,6 +91,20 @@ public class TestZkBasedTableRebalanceObserver {
assertEquals(observer.getNumUpdatesToZk(), 5);
}
+ private void checkProgressPercentMetrics(ControllerMetrics controllerMetrics,
+ ZkBasedTableRebalanceObserver observer) {
+ Long progressGaugeValue = controllerMetrics.getGaugeValue(
+ ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT.getGaugeName() +
".dummy.testZkObserverTracking");
+ assertNotNull(progressGaugeValue);
+ TableRebalanceProgressStats.RebalanceProgressStats overallProgress =
+
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+ long progressRemained = (long)
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+ overallProgress._totalSegmentsToBeAdded +
overallProgress._totalSegmentsToBeDeleted,
+ overallProgress._totalRemainingSegmentsToBeAdded +
overallProgress._totalRemainingSegmentsToBeDeleted
+ + overallProgress._totalRemainingSegmentsToConverge));
+ assertEquals(progressGaugeValue, progressRemained > 100 ? 0 : 100 -
progressRemained);
+ }
+
@Test
void testDifferenceBetweenTableRebalanceStates() {
Map<String, Map<String, String>> target = new TreeMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]