This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 91d2a95 Clean up controller-table related metrics in
ControllerPeriodicTask (#7557)
91d2a95 is described below
commit 91d2a958945f0b5d862821a216b0cba065be8461
Author: Jialiang Li <[email protected]>
AuthorDate: Fri Nov 5 17:56:33 2021 -0700
Clean up controller-table related metrics in ControllerPeriodicTask (#7557)
Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
.../pinot/common/metrics/ValidationMetrics.java | 107 +++++++++++++++++----
.../controller/helix/SegmentStatusChecker.java | 16 +++
.../core/periodictask/ControllerPeriodicTask.java | 14 +++
.../validation/OfflineSegmentIntervalChecker.java | 16 +++
.../RealtimeSegmentValidationManager.java | 10 ++
5 files changed, 145 insertions(+), 18 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index efe3c82..b47d068 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -136,8 +136,16 @@ public class ValidationMetrics {
* @param missingSegmentCount The number of missing segments
*/
public void updateMissingSegmentCountGauge(final String resource, final int
missingSegmentCount) {
- final String fullGaugeName = makeGaugeName(resource,
"missingSegmentCount");
- makeGauge(fullGaugeName, makeMetricName(fullGaugeName),
_storedValueGaugeFactory, missingSegmentCount);
+ makeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT,
_storedValueGaugeFactory, missingSegmentCount);
+ }
+
+ /**
+ * Cleans up the missing segment count gauge.
+ *
+ * @param resource The resource for which the gauge is removed
+ */
+ public void cleanupMissingSegmentCountGauge(final String resource) {
+ removeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT);
}
/**
@@ -148,12 +156,20 @@ public class ValidationMetrics {
* if there is no such time.
*/
public void updateOfflineSegmentDelayGauge(final String resource, final long
lastOfflineSegmentTime) {
- final String fullGaugeNameHours = makeGaugeName(resource,
"offlineSegmentDelayHours");
- makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours),
_currentTimeMillisDeltaGaugeHoursFactory,
+ makeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS,
_currentTimeMillisDeltaGaugeHoursFactory,
lastOfflineSegmentTime);
}
/**
+ * Cleans up offline segment delay gauge.
+ *
+ * @param resource The resource for which the gauge is removed
+ */
+ public void cleanupOfflineSegmentDelayGauge(final String resource) {
+ removeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS);
+ }
+
+ /**
* Updates the last push time gauge.
*
* @param resource The resource for which the gauge is updated
@@ -161,20 +177,36 @@ public class ValidationMetrics {
* such time.
*/
public void updateLastPushTimeGauge(final String resource, final long
lastPushTimeMillis) {
- final String fullGaugeNameHours = makeGaugeName(resource,
"lastPushTimeDelayHours");
- makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours),
_currentTimeMillisDeltaGaugeHoursFactory,
+ makeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS,
_currentTimeMillisDeltaGaugeHoursFactory,
lastPushTimeMillis);
}
/**
+ * Cleans up the last push time gauge.
+ *
+ * @param resource The resource for which the gauge is removed
+ */
+ public void cleanupLastPushTimeGauge(final String resource) {
+ removeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS);
+ }
+
+ /**
* Updates the total document count gauge.
*
* @param resource The resource for which the gauge is updated
* @param documentCount Total document count for the given resource name or
table name
*/
public void updateTotalDocumentCountGauge(final String resource, final long
documentCount) {
- final String fullGaugeName = makeGaugeName(resource, "TotalDocumentCount");
- makeGauge(fullGaugeName, makeMetricName(fullGaugeName),
_storedValueGaugeFactory, documentCount);
+ makeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT,
_storedValueGaugeFactory, documentCount);
+ }
+
+ /**
+ * Cleans up the total document count gauge.
+ *
+ * @param resource The resource for which the gauge is removed
+ */
+ public void cleanupTotalDocumentCountGauge(final String resource) {
+ removeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT);
}
/**
@@ -184,8 +216,7 @@ public class ValidationMetrics {
* @param partitionCount Number of partitions that do not have any segment
in CONSUMING state.
*/
public void updateNonConsumingPartitionCountMetric(final String resource,
final int partitionCount) {
- final String fullGaugeName = makeGaugeName(resource,
"NonConsumingPartitionCount");
- makeGauge(fullGaugeName, makeMetricName(fullGaugeName),
_storedValueGaugeFactory, partitionCount);
+ makeGauge(resource, ValidationMetricName.NON_CONSUMING_PARTITION_COUNT,
_storedValueGaugeFactory, partitionCount);
}
/**
@@ -195,8 +226,16 @@ public class ValidationMetrics {
* @param segmentCount Total segment count for the given resource name or
table name
*/
public void updateSegmentCountGauge(final String resource, final long
segmentCount) {
- final String fullGaugeName = makeGaugeName(resource, "SegmentCount");
- makeGauge(fullGaugeName, makeMetricName(fullGaugeName),
_storedValueGaugeFactory, segmentCount);
+ makeGauge(resource, ValidationMetricName.SEGMENT_COUNT,
_storedValueGaugeFactory, segmentCount);
+ }
+
+ /**
+ * Cleans up the segment count gauge.
+ *
+ * @param resource The resource for which the gauge is removed
+ */
+ public void cleanupSegmentCountGauge(final String resource) {
+ removeGauge(resource, ValidationMetricName.SEGMENT_COUNT);
}
@VisibleForTesting
@@ -208,17 +247,27 @@ public class ValidationMetrics {
return PinotMetricUtils.makePinotMetricName(ValidationMetrics.class,
gaugeName);
}
- private void makeGauge(final String gaugeName, final PinotMetricName
metricName, final GaugeFactory<?> gaugeFactory,
- final long value) {
- if (!_gaugeValues.containsKey(gaugeName)) {
- _gaugeValues.put(gaugeName, value);
- PinotMetricUtils.makeGauge(_metricsRegistry, metricName,
gaugeFactory.buildGauge(gaugeName));
+ private void makeGauge(final String resource, final ValidationMetricName
validationMetricName,
+ final GaugeFactory<?> gaugeFactory, final long value) {
+ final String fullGaugeName = makeGaugeName(resource,
validationMetricName.getMetricName());
+ PinotMetricName metricName = makeMetricName(fullGaugeName);
+ if (!_gaugeValues.containsKey(fullGaugeName)) {
+ _gaugeValues.put(fullGaugeName, value);
+ PinotMetricUtils.makeGauge(_metricsRegistry, metricName,
gaugeFactory.buildGauge(fullGaugeName));
_metricNames.add(metricName);
} else {
- _gaugeValues.put(gaugeName, value);
+ _gaugeValues.put(fullGaugeName, value);
}
}
+ private void removeGauge(final String resource, final ValidationMetricName
validationMetricName) {
+ final String fullGaugeName = makeGaugeName(resource,
validationMetricName.getMetricName());
+ PinotMetricName pinotMetricName = makeMetricName(fullGaugeName);
+ PinotMetricUtils.removeMetric(_metricsRegistry, pinotMetricName);
+ _metricNames.remove(pinotMetricName);
+ _gaugeValues.remove(fullGaugeName);
+ }
+
/**
* Unregisters all validation metrics.
*/
@@ -239,4 +288,26 @@ public class ValidationMetrics {
}
return value;
}
+
+ /**
+ * Names of validation metrics.
+ */
+ public enum ValidationMetricName {
+ MISSING_SEGMENT_COUNT("missingSegmentCount"),
+ OFFLINE_SEGMENT_DELAY_HOURS("offlineSegmentDelayHours"),
+ LAST_PUSH_TIME_DELAY_HOURS("lastPushTimeDelayHours"),
+ TOTAL_DOCUMENT_COUNT("TotalDocumentCount"),
+ NON_CONSUMING_PARTITION_COUNT("NonConsumingPartitionCount"),
+ SEGMENT_COUNT("SegmentCount");
+
+ private final String _metricName;
+
+ ValidationMetricName(String metricName) {
+ _metricName = metricName;
+ }
+
+ public String getMetricName() {
+ return _metricName;
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d7d74f9..93350fe 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -240,6 +240,22 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask<SegmentStatusCh
}
}
+ @Override
+ protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+ for (String tableNameWithType : tableNamesWithType) {
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.NUMBER_OF_REPLICAS);
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_OF_REPLICAS);
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
+
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_SIZE);
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE);
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.SEGMENT_COUNT);
+
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.SEGMENTS_IN_ERROR_STATE);
+ _controllerMetrics.removeTableGauge(tableNameWithType,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
+ }
+ }
+
private void setStatusToDefault() {
List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 9d7a676..439f8be 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -65,11 +65,14 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
// Process the tables that are managed by this controller
List<String> tablesToProcess = new ArrayList<>();
+ List<String> nonLeaderForTables = new ArrayList<>();
if (propTableNameWithType == null) {
// Table name is not available, so task should run on all tables for
which this controller is the lead.
for (String tableNameWithType :
_pinotHelixResourceManager.getAllTables()) {
if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
tablesToProcess.add(tableNameWithType);
+ } else {
+ nonLeaderForTables.add(tableNameWithType);
}
}
} else {
@@ -82,6 +85,9 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
if (!tablesToProcess.isEmpty()) {
processTables(tablesToProcess);
}
+ if (!nonLeaderForTables.isEmpty()) {
+ nonLeaderCleanup(nonLeaderForTables);
+ }
} catch (Exception e) {
LOGGER.error("Caught exception while running task: {}", _taskName, e);
_controllerMetrics.addMeteredTableValue(_taskName,
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
@@ -156,4 +162,12 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
*/
protected void postprocess() {
}
+
+ /**
+ * Can be overridden to perform cleanups for tables that the current
controller isn't the leader.
+ *
+ * @param tableNamesWithType the table names that the current controller
isn't the leader for
+ */
+ protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 1e11d1e..1b08c01 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -136,6 +136,22 @@ public class OfflineSegmentIntervalChecker extends
ControllerPeriodicTask<Void>
_validationMetrics.updateSegmentCountGauge(offlineTableName, numSegments);
}
+ @Override
+ protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+ for (String tableNameWithType : tableNamesWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == TableType.OFFLINE) {
+ // TODO: we can further split the existing ValidationMetricName enum
to OFFLINE and REALTIME,
+ // so that we can simply loop through all the enum values and clean
up the metrics.
+ _validationMetrics.cleanupMissingSegmentCountGauge(tableNameWithType);
+ _validationMetrics.cleanupOfflineSegmentDelayGauge(tableNameWithType);
+ _validationMetrics.cleanupLastPushTimeGauge(tableNameWithType);
+ _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+ _validationMetrics.cleanupSegmentCountGauge(tableNameWithType);
+ }
+ }
+ }
+
/**
* Computes the number of missing segments based on the given existing
segment intervals and the expected frequency
* of the intervals.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 53291b3..237924a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -124,6 +124,16 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
}
}
+ @Override
+ protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+ for (String tableNameWithType : tableNamesWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == TableType.REALTIME) {
+ _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+ }
+ }
+ }
+
@VisibleForTesting
static long computeRealtimeTotalDocumentInSegments(List<SegmentZKMetadata>
segmentsZKMetadata,
boolean countHLCSegments) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]