This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 14bc147d88d Add metrics to track server startup time spending on
different status callback (#18493)
14bc147d88d is described below
commit 14bc147d88d78ed3ffa2009959b93b061605af9e
Author: Jhow <[email protected]>
AuthorDate: Sun Jun 21 19:16:47 2026 -0700
Add metrics to track server startup time spending on different status
callback (#18493)
---
.../apache/pinot/common/metrics/ServerGauge.java | 6 +++
.../apache/pinot/common/utils/ServiceStatus.java | 10 ++--
.../server/starter/helix/BaseServerStarter.java | 58 +++++++++++++++++++---
3 files changed, 62 insertions(+), 12 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 46df92b2714..ac29c9541bd 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -153,6 +153,12 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
HELIX_MESSAGES_COUNT("count", true),
STARTUP_STATUS_CHECK_IN_PROGRESS("state", true,
"Indicates whether the server startup status check is currently in
progress"),
+ STARTUP_CURRENT_STATE_MATCH_TIME_MS("milliseconds", true,
+ "Time in ms from status checker registration until
ideal-state/current-state match first reports GOOD"),
+ STARTUP_EXTERNAL_VIEW_MATCH_TIME_MS("milliseconds", true,
+ "Time in ms from status checker registration until
ideal-state/external-view match first reports GOOD"),
+ STARTUP_REALTIME_CONSUMPTION_CATCHUP_TIME_MS("milliseconds", true,
+ "Time in ms from status checker registration until realtime consumption
catchup first reports GOOD"),
CONSUMER_LOCK_WAIT_TIME_MS("milliseconds", false,
"Indicates the time consumer spends while waiting on the consumer
lock."),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
index 3731d3b23d3..ba0db262494 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/ServiceStatus.java
@@ -174,16 +174,18 @@ public class ServiceStatus {
@Override
public Status getServiceStatus() {
- // Iterate through all callbacks, returning the first non GOOD one as
the service status
+ // Iterate through all callbacks so each one is invoked (callers may
wrap delegates to observe transitions),
+ // but report the first non-GOOD status encountered, matching the
original contract.
+ Status firstNonGood = null;
for (ServiceStatusCallback statusCallback : _statusCallbacks) {
final Status serviceStatus = statusCallback.getServiceStatus();
- if (serviceStatus != Status.GOOD) {
- return serviceStatus;
+ if (serviceStatus != Status.GOOD && firstNonGood == null) {
+ firstNonGood = serviceStatus;
}
}
// All callbacks report good, therefore we're good too
- return Status.GOOD;
+ return firstNonGood == null ? Status.GOOD : firstNonGood;
}
@Override
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index cfbbc349c88..c5ae95cd3de 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
@@ -421,12 +422,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
}
List<ServiceStatus.ServiceStatusCallback> serviceStatusCallbackListBuilder
= new ArrayList<>();
- serviceStatusCallbackListBuilder.add(
+ serviceStatusCallbackListBuilder.add(new TimeToHealthyTrackingCallback(
new
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager,
_helixClusterName,
- _instanceId, resourcesToMonitor, minResourcePercentForStartup));
- serviceStatusCallbackListBuilder.add(
+ _instanceId, resourcesToMonitor, minResourcePercentForStartup),
+ ServerGauge.STARTUP_CURRENT_STATE_MATCH_TIME_MS, _serverMetrics));
+ serviceStatusCallbackListBuilder.add(new TimeToHealthyTrackingCallback(
new
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
_helixClusterName,
- _instanceId, resourcesToMonitor, minResourcePercentForStartup));
+ _instanceId, resourcesToMonitor, minResourcePercentForStartup),
+ ServerGauge.STARTUP_EXTERNAL_VIEW_MATCH_TIME_MS, _serverMetrics));
boolean foundConsuming = !consumingSegments.isEmpty();
if (checkRealtime && foundConsuming) {
// We specifically put the freshness based checker first to ensure it's
the only one setup if both checkers
@@ -443,9 +446,10 @@ public abstract class BaseServerStarter implements
ServiceStartable {
this::getConsumingSegments, realtimeMinFreshnessMs,
idleTimeoutMs);
Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
- serviceStatusCallbackListBuilder.add(
+ serviceStatusCallbackListBuilder.add(new TimeToHealthyTrackingCallback(
new
ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager,
_helixClusterName,
- _instanceId, realtimeConsumptionCatchupWaitMs,
getNumConsumingSegmentsNotReachedMinFreshness));
+ _instanceId, realtimeConsumptionCatchupWaitMs,
getNumConsumingSegmentsNotReachedMinFreshness),
+ ServerGauge.STARTUP_REALTIME_CONSUMPTION_CATCHUP_TIME_MS,
_serverMetrics));
} else if (isOffsetBasedConsumptionStatusCheckerEnabled) {
LOGGER.info("Setting up offset based status checker");
OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
@@ -453,11 +457,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
this::getConsumingSegments);
Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset =
consumptionStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
- serviceStatusCallbackListBuilder.add(
+ serviceStatusCallbackListBuilder.add(new TimeToHealthyTrackingCallback(
new
ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager,
_helixClusterName,
- _instanceId, realtimeConsumptionCatchupWaitMs,
getNumConsumingSegmentsNotReachedTheirLatestOffset));
+ _instanceId, realtimeConsumptionCatchupWaitMs,
getNumConsumingSegmentsNotReachedTheirLatestOffset),
+ ServerGauge.STARTUP_REALTIME_CONSUMPTION_CATCHUP_TIME_MS,
_serverMetrics));
} else {
LOGGER.info("Setting up static time based status checker");
+ // Not wrapped with a time-to-healthy gauge: this checker turns GOOD
purely on wall-clock elapsed time, so the
+ // resulting metric would just echo realtimeConsumptionCatchupWaitMs
and not reflect actual catchup latency.
serviceStatusCallbackListBuilder.add(
new
ServiceStatus.RealtimeConsumptionCatchupServiceStatusCallback(_helixManager,
_helixClusterName,
_instanceId, realtimeConsumptionCatchupWaitMs, null));
@@ -1308,6 +1315,41 @@ public abstract class BaseServerStarter implements
ServiceStartable {
return new SegmentOnlineOfflineStateModelFactory(instanceDataManager,
transitionThreadPoolManager);
}
+ /**
+ * Wraps a {@link ServiceStatus.ServiceStatusCallback} and records the
elapsed time (in ms since this wrapper was
+ * constructed) into the supplied gauge the first time the delegate reports
{@link Status#GOOD}. Subsequent calls
+ * leave the gauge value frozen. {@code getServiceStatus} can be called
concurrently by the startup poll loop and
+ * by HTTP threads (health/tables endpoints), so the record-once latch uses
CAS for visibility.
+ */
+ private static final class TimeToHealthyTrackingCallback implements
ServiceStatus.ServiceStatusCallback {
+ private final ServiceStatus.ServiceStatusCallback _delegate;
+ private final ServerGauge _gauge;
+ private final ServerMetrics _serverMetrics;
+ private final long _startNanos = System.nanoTime();
+ private final AtomicBoolean _recorded = new AtomicBoolean(false);
+
+ TimeToHealthyTrackingCallback(ServiceStatus.ServiceStatusCallback
delegate, ServerGauge gauge,
+ ServerMetrics serverMetrics) {
+ _delegate = delegate;
+ _gauge = gauge;
+ _serverMetrics = serverMetrics;
+ }
+
+ @Override
+ public Status getServiceStatus() {
+ Status status = _delegate.getServiceStatus();
+ if (status == Status.GOOD && _recorded.compareAndSet(false, true)) {
+ _serverMetrics.setValueOfGlobalGauge(_gauge,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - _startNanos));
+ }
+ return status;
+ }
+
+ @Override
+ public String getStatusDescription() {
+ return _delegate.getStatusDescription();
+ }
+ }
+
private void refreshMessageCount() {
try {
HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]