This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fc8ee6a09c Metrics for Segment Upload/Download (#15585)
fc8ee6a09c is described below
commit fc8ee6a09c4084262b2ba31968f677f3409746dd
Author: Praveen <[email protected]>
AuthorDate: Wed Apr 23 13:42:22 2025 -0700
Metrics for Segment Upload/Download (#15585)
* Segment metrics
* Upload guage
* only controller metrics
* cleanup
* review comments 2
---
.../pinot/common/metrics/ControllerGauge.java | 11 ++-
.../pinot/common/metrics/ControllerMeter.java | 6 +-
.../pinot/common/metrics/ControllerTimer.java | 7 +-
.../resources/LLCSegmentCompletionHandlers.java | 10 +++
.../PinotSegmentUploadDownloadRestletResource.java | 11 ++-
.../controller/api/resources/ResourceUtils.java | 90 +++++++++++++++++++++-
.../pinot/controller/api/upload/ZKOperator.java | 11 ++-
7 files changed, 137 insertions(+), 9 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 3ae4db405a..51ff09387c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -207,7 +207,16 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT("pauselessSegmentsInUnrecoverableErrorCount",
false),
// ZK JUTE max buffer size in bytes
- ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true);
+ ZK_JUTE_MAX_BUFFER("zkJuteMaxBuffer", true),
+
+ // Bytes to be read from deep store
+ DEEP_STORE_READ_BYTES_IN_PROGRESS("deepStoreReadBytesInProgress", true),
+ // Count of deep store segment downloads that are currently in progress
+ DEEP_STORE_READ_OPS_IN_PROGRESS("deepStoreReadOpsInProgress", true),
+ // Bytes to be written to deep store
+ DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true),
+ // Count of deep store segment writes that are currently in progress
+ DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 850588cb45..5f934561ad 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -69,7 +69,11 @@ public enum ControllerMeter implements AbstractMetrics.Meter
{
IDEAL_STATE_UPDATE_FAILURE("IdealStateUpdateFailure", false),
IDEAL_STATE_UPDATE_RETRY("IdealStateUpdateRetry", false),
IDEAL_STATE_UPDATE_SUCCESS("IdealStateUpdateSuccess", false),
- SEGMENT_SIZE_AUTO_REDUCTION("SegmentSizeAutoReduction", false);
+ SEGMENT_SIZE_AUTO_REDUCTION("SegmentSizeAutoReduction", false),
+ // Total Bytes read from deep store
+ DEEP_STORE_READ_BYTES_COMPLETED("deepStoreReadBytesCompleted", true),
+ // Total Bytes written to deep store
+ DEEP_STORE_WRITE_BYTES_COMPLETED("deepStoreWriteBytesCompleted", true);
private final String _brokerMeterName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
index c56b46343c..23d4807627 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerTimer.java
@@ -30,7 +30,12 @@ public enum ControllerTimer implements AbstractMetrics.Timer
{
CRON_SCHEDULER_JOB_EXECUTION_TIME_MS("cronSchedulerJobExecutionTimeMs",
false),
IDEAL_STATE_UPDATE_TIME_MS("IdealStateUpdateTimeMs", false),
// How long it took the server to start.
- STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true);
+ STARTUP_SUCCESS_DURATION_MS("startupSuccessDurationMs", true),
+ // Time taken to read the segment from deep store
+ DEEP_STORE_SEGMENT_READ_TIME_MS("deepStoreSegmentReadTimeMs", true),
+ // Time taken to write the segment to deep store
+ DEEP_STORE_SEGMENT_WRITE_TIME_MS("deepStoreSegmentWriteTimeMs", true);
+
private final String _timerName;
private final boolean _global;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 6e3011e666..6df9c1c63a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -39,6 +39,7 @@ import javax.ws.rs.core.MediaType;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarCompressionUtils;
@@ -71,6 +72,9 @@ public class LLCSegmentCompletionHandlers {
@Inject
SegmentCompletionManager _segmentCompletionManager;
+ @Inject
+ ControllerMetrics _controllerMetrics;
+
@VisibleForTesting
public static String getScheme() {
return SCHEME;
@@ -227,7 +231,13 @@ public class LLCSegmentCompletionHandlers {
URI segmentFileURI =
URIUtils.getUri(ControllerFilePathProvider.getInstance().getDataDirURI().toString(),
rawTableName,
URIUtils.encode(SegmentCompletionUtils.generateTmpSegmentFileName(segmentName)));
+ // Emit metrics related to deep-store upload operation
+ long startTimeMs = System.currentTimeMillis();
+ long segmentSizeBytes = localTempFile.length();
+ ResourceUtils.emitPreSegmentUploadMetrics(_controllerMetrics,
rawTableName, segmentSizeBytes);
PinotFSFactory.create(segmentFileURI.getScheme()).copyFromLocalFile(localTempFile,
segmentFileURI);
+ ResourceUtils.emitPostSegmentUploadMetrics(_controllerMetrics,
rawTableName, startTimeMs, segmentSizeBytes);
+
SegmentCompletionProtocol.Response.Params responseParams = new
SegmentCompletionProtocol.Response.Params()
.withStreamPartitionMsgOffset(requestParams.getStreamPartitionMsgOffset())
.withSegmentLocation(segmentFileURI.toString())
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index deada81d92..f160c5c60a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -214,8 +214,15 @@ public class PinotSegmentUploadDownloadRestletResource {
segmentFile =
org.apache.pinot.common.utils.FileUtils.concatAndValidateFile(tableDir,
segmentName + "-" + UUID.randomUUID(),
"Invalid segment name: %s", segmentName);
-
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ // Emit metrics related to deep-store download operation
+ long deepStoreDownloadStartTimeMs = System.currentTimeMillis();
+ long segmentSizeInBytes = segmentFile.length();
+ ResourceUtils.emitPreSegmentDownloadMetrics(_controllerMetrics,
rawTableName, segmentSizeInBytes);
pinotFS.copyToLocalFile(remoteSegmentFileURI, segmentFile);
+ ResourceUtils.emitPostSegmentDownloadMetrics(_controllerMetrics,
rawTableName,
+ System.currentTimeMillis() - deepStoreDownloadStartTimeMs,
segmentSizeInBytes);
+
// Streaming in the tmp file and delete it afterward.
builder.entity((StreamingOutput) output -> {
try {
@@ -233,6 +240,7 @@ public class PinotSegmentUploadDownloadRestletResource {
private SuccessResponse uploadSegment(@Nullable String tableName, TableType
tableType,
@Nullable FormDataMultiPart multiPart, boolean
copySegmentToFinalLocation, boolean enableParallelPushProtection,
boolean allowRefresh, HttpHeaders headers, Request request) {
+ long segmentUploadStartTimeMs = System.currentTimeMillis();
if (StringUtils.isNotEmpty(tableName)) {
TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableTypeFromTableName != null && tableTypeFromTableName !=
tableType) {
@@ -412,7 +420,6 @@ public class PinotSegmentUploadDownloadRestletResource {
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata,
uploadType, finalSegmentLocationURI,
segmentFile, sourceDownloadURIStr, segmentDownloadURIStr,
crypterName, segmentSizeInBytes,
enableParallelPushProtection, allowRefresh, headers);
-
return new SuccessResponse("Successfully uploaded segment: " +
segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
throw e;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
index ab091c8138..3d6fce7448 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ResourceUtils.java
@@ -19,10 +19,16 @@
package org.apache.pinot.controller.api.resources;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.api.access.AccessControl;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessControlUtils;
@@ -34,13 +40,20 @@ import org.apache.pinot.spi.config.table.TableType;
import org.glassfish.grizzly.http.server.Request;
import org.slf4j.Logger;
-
public class ResourceUtils {
+
private ResourceUtils() {
}
+ // Shared static variable
+ private static AtomicLong _deepStoreWriteOpsInProgress = new AtomicLong(0);
+ private static AtomicLong _deepStoreWriteBytesInProgress = new AtomicLong(0);
+ private static AtomicLong _deepStoreReadOpsInProgress = new AtomicLong(0);
+ private static AtomicLong _deepStoreReadBytesInProgress = new AtomicLong(0);
+
public static List<String>
getExistingTableNamesWithType(PinotHelixResourceManager
pinotHelixResourceManager,
- String tableName, @Nullable TableType tableType, Logger logger) {
+ String tableName,
@Nullable TableType tableType,
+ Logger logger) {
try {
return
pinotHelixResourceManager.getExistingTableNamesWithType(tableName, tableType);
} catch (TableNotFoundException e) {
@@ -78,4 +91,77 @@ public class ResourceUtils {
throw new ControllerApplicationException(logger, "Permission denied",
Response.Status.FORBIDDEN);
}
}
+
+ public static void emitPreSegmentUploadMetrics(ControllerMetrics
controllerMetrics, String rawTableName,
+ long segmentSizeInBytes) {
+ long writeCount = _deepStoreWriteOpsInProgress.incrementAndGet();
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
+ writeCount);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
writeCount);
+
+ long segmentBytesUploading =
_deepStoreWriteBytesInProgress.addAndGet(segmentSizeInBytes);
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_WRITE_BYTES_IN_PROGRESS,
+ segmentBytesUploading);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_BYTES_IN_PROGRESS,
segmentBytesUploading);
+ }
+
+ public static void emitPostSegmentUploadMetrics(ControllerMetrics
controllerMetrics, String rawTableName,
+ long startTimeMs, long
segmentSizeInBytes) {
+ long writeCount = _deepStoreWriteOpsInProgress.decrementAndGet();
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
+ writeCount);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
writeCount);
+
+ long segmentBytesUploading =
_deepStoreWriteBytesInProgress.addAndGet(-segmentSizeInBytes);
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
+ segmentBytesUploading);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_WRITE_OPS_IN_PROGRESS,
segmentBytesUploading);
+
+ long durationMs = System.currentTimeMillis() - startTimeMs;
+ controllerMetrics.addTimedTableValue(rawTableName,
ControllerTimer.DEEP_STORE_SEGMENT_WRITE_TIME_MS, durationMs,
+ TimeUnit.MILLISECONDS);
+
controllerMetrics.addTimedValue(ControllerTimer.DEEP_STORE_SEGMENT_WRITE_TIME_MS,
durationMs,
+ TimeUnit.MILLISECONDS);
+
+ controllerMetrics.addMeteredTableValue(rawTableName,
ControllerMeter.DEEP_STORE_WRITE_BYTES_COMPLETED,
+ segmentSizeInBytes);
+
controllerMetrics.addMeteredGlobalValue(ControllerMeter.DEEP_STORE_WRITE_BYTES_COMPLETED,
segmentSizeInBytes);
+ }
+
+ public static void emitPreSegmentDownloadMetrics(ControllerMetrics
controllerMetrics, String rawTableName,
+ long segmentSizeInBytes) {
+ long readCount = _deepStoreReadOpsInProgress.incrementAndGet();
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS,
+ readCount);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS,
readCount);
+
+ long segmentBytesDownloading =
_deepStoreReadBytesInProgress.addAndGet(segmentSizeInBytes);
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+ segmentBytesDownloading);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+ segmentBytesDownloading);
+ }
+
+ public static void emitPostSegmentDownloadMetrics(ControllerMetrics
controllerMetrics, String rawTableName,
+ long startTimeMs, long
segmentSizeInBytes) {
+ long readCount = _deepStoreReadOpsInProgress.decrementAndGet();
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS, readCount);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_OPS_IN_PROGRESS,
readCount);
+
+ long segmentBytesDownloading =
_deepStoreReadBytesInProgress.addAndGet(-segmentSizeInBytes);
+ controllerMetrics.setOrUpdateTableGauge(rawTableName,
ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+ segmentBytesDownloading);
+
controllerMetrics.setValueOfGlobalGauge(ControllerGauge.DEEP_STORE_READ_BYTES_IN_PROGRESS,
+ segmentBytesDownloading);
+
+ long durationMs = System.currentTimeMillis() - startTimeMs;
+ controllerMetrics.addTimedTableValue(rawTableName,
ControllerTimer.DEEP_STORE_SEGMENT_READ_TIME_MS,
+ durationMs, TimeUnit.MILLISECONDS);
+
controllerMetrics.addTimedValue(ControllerTimer.DEEP_STORE_SEGMENT_READ_TIME_MS,
durationMs,
+ TimeUnit.MILLISECONDS);
+
+ controllerMetrics.addMeteredTableValue(rawTableName,
ControllerMeter.DEEP_STORE_READ_BYTES_COMPLETED,
+ segmentSizeInBytes);
+
controllerMetrics.addMeteredGlobalValue(ControllerMeter.DEEP_STORE_READ_BYTES_COMPLETED,
segmentSizeInBytes);
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 8486853650..8ff39b9def 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -40,10 +40,12 @@ import
org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.resources.ResourceUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -771,16 +773,21 @@ public class ZKOperator {
} else {
// In push types other than METADATA, local segmentFile contains the
complete segment.
// Move local segment to final location
- copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
+ copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI,
tableNameWithType);
LOGGER.info("Copied segment: {} of table: {} to final location: {}",
segmentName, tableNameWithType,
finalSegmentLocationURI);
}
}
- private void copyFromSegmentFileToDeepStore(File segmentFile, URI
finalSegmentLocationURI)
+ private void copyFromSegmentFileToDeepStore(File segmentFile, URI
finalSegmentLocationURI, String tableNameWithType)
throws Exception {
LOGGER.info("Copying segment from: {} to: {}",
segmentFile.getAbsolutePath(), finalSegmentLocationURI);
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ long segmentSizeInBytes = segmentFile.length();
+ long startTimeMs = System.currentTimeMillis();
+ ResourceUtils.emitPreSegmentUploadMetrics(_controllerMetrics,
rawTableName, segmentSizeInBytes);
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile,
finalSegmentLocationURI);
+ ResourceUtils.emitPostSegmentUploadMetrics(_controllerMetrics,
rawTableName, startTimeMs, segmentSizeInBytes);
}
private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI
finalSegmentLocationURI)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]