This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 862a931667 Improve minion observer stats to capture more granular
stages of minion task execution (#15118)
862a931667 is described below
commit 862a9316670ccbf48f33af5ce780469d84dd5440
Author: Shounak kulkarni <[email protected]>
AuthorDate: Tue Mar 4 09:36:30 2025 +0530
Improve minion observer stats to capture more granular stages of minion
task execution (#15118)
* Add more stats to MinionTaskBaseObserverStats for better tracking
* Add granular tracking to SegmentProcessorFramework
* lint fixes
* Add minion API to get the subtask stats
* test fix
* fixes
* add comments
* Add comments on _stageTimes
---
.../framework/SegmentProcessorFramework.java | 35 +++++--
.../segment/processing/mapper/SegmentMapper.java | 10 +-
.../api/resources/PinotTaskProgressResource.java | 39 +++++++
.../pinot/minion/event/MinionProgressObserver.java | 73 +++++++++++--
.../spi/tasks/MinionTaskBaseObserverStats.java | 115 +++++++++++++++++++--
.../spi/tasks/MinionTaskBaseObserverStatsTest.java | 43 +++++---
.../resources/observer_stats_test_payload.json | 26 +++++
7 files changed, 302 insertions(+), 39 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index a6d3f74042..596eb3266e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -43,6 +43,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
+import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +60,9 @@ import org.slf4j.LoggerFactory;
*/
public class SegmentProcessorFramework {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentProcessorFramework.class);
+ public static final String MAP_STAGE = "MAP";
+ public static final String REDUCE_STAGE = "REDUCE";
+ public static final String GENERATE_STAGE = "GENERATE_SEGMENT";
private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
private final List<RecordTransformer> _customRecordTransformers;
@@ -157,9 +161,9 @@ public class SegmentProcessorFramework {
int numRecordReaders = _recordReaderFileConfigs.size();
int nextRecordReaderIndexToBeProcessed = 0;
int iterationCount = 1;
- Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
boolean isMapperOutputSizeThresholdEnabled =
_segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold()
!= Long.MAX_VALUE;
+ String logMessage;
while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
// Initialise the mapper. Eliminate the record readers that have been
processed in the previous iterations.
@@ -168,34 +172,42 @@ public class SegmentProcessorFramework {
// Log start of iteration details only if intermediate file size
threshold is set.
if (isMapperOutputSizeThresholdEnabled) {
- String logMessage =
+ logMessage =
String.format("Starting iteration %d with %d record readers.
Starting index = %d, end index = %d",
iterationCount,
_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
numRecordReaders).size(),
nextRecordReaderIndexToBeProcessed + 1, numRecordReaders);
LOGGER.info(logMessage);
- observer.accept(logMessage);
+ logToObserver(MAP_STAGE, logMessage);
}
// Map phase.
long mapStartTimeInMs = System.currentTimeMillis();
+ logToObserver(MAP_STAGE, "Starting Map phase for iteration " +
iterationCount);
Map<String, GenericRowFileManager> partitionToFileManagerMap =
mapper.map();
// Log the time taken to map.
- LOGGER.info("Finished iteration {} in {}ms", iterationCount,
System.currentTimeMillis() - mapStartTimeInMs);
+ logMessage = "Finished Map phase for iteration " + iterationCount + " in
"
+ + (System.currentTimeMillis() - mapStartTimeInMs) + "ms";
+ LOGGER.info(logMessage);
+ logToObserver(MAP_STAGE, logMessage);
// Check for mapper output files, if no files are generated, skip the
reducer phase and move on to the next
// iteration.
if (partitionToFileManagerMap.isEmpty()) {
- LOGGER.info("No mapper output files generated, skipping reduce phase");
+ logMessage = "No mapper output files generated, skipping reduce phase";
+ LOGGER.info(logMessage);
+ logToObserver(MAP_STAGE, logMessage);
nextRecordReaderIndexToBeProcessed =
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
continue;
}
// Reduce phase.
+ logToObserver(REDUCE_STAGE, "Starting Reduce phase for iteration " +
iterationCount);
doReduce(partitionToFileManagerMap);
// Segment creation phase. Add the created segments to the final list.
+ logToObserver(GENERATE_STAGE, "Generating segments for iteration " +
iterationCount);
outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
// Store the starting index of the record readers that were processed in
this iteration for logging purposes.
@@ -213,7 +225,6 @@ public class SegmentProcessorFramework {
// We are sure that the last RecordReader is completely processed in
the last iteration else it may or may not
// have completed processing. Log it accordingly.
- String logMessage;
if (nextRecordReaderIndexToBeProcessed == numRecordReaders) {
logMessage = String.format("Finished processing all of %d
RecordReaders", numRecordReaders);
} else {
@@ -222,16 +233,22 @@ public class SegmentProcessorFramework {
+ "iteration %d", startingProcessedRecordReaderIndex + 1,
boundaryIndexToLog,
nextRecordReaderIndexToBeProcessed + 1, numRecordReaders,
iterationCount);
}
-
- observer.accept(logMessage);
LOGGER.info(logMessage);
+ logToObserver(GENERATE_STAGE, logMessage);
}
-
iterationCount++;
}
return outputSegmentDirs;
}
+ private void logToObserver(String stage, String logMessage) {
+ _segmentProcessorConfig.getProgressObserver()
+ .accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
+ .withStage(stage)
+ .withStatus(logMessage)
+ .build());
+ }
+
protected SegmentMapper getSegmentMapper(List<RecordReaderFileConfig>
recordReaderFileConfigs) {
if (_transformPipeline != null) {
return new SegmentMapper(recordReaderFileConfigs, _transformPipeline,
_segmentProcessorConfig, _mapperOutputDir);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 3ec0a539c7..2697846f79 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
+import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,10 +188,15 @@ public class SegmentMapper {
writeRecord(transformedRow);
}
} catch (Exception e) {
+ String logMessage = "Caught exception while reading data.";
+ observer.accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
+ .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
+ .withStatus(logMessage + " Reason: " + e.getMessage())
+ .build());
if (!continueOnError) {
- throw new RuntimeException("Caught exception while reading data", e);
+ throw new RuntimeException(logMessage, e);
} else {
- LOGGER.debug("Caught exception while reading data", e);
+ LOGGER.debug(logMessage, e);
continue;
}
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
index ac91835c76..95a9a8efce 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
@@ -45,6 +45,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.event.MinionTaskState;
+import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
@@ -143,4 +144,42 @@ public class PinotTaskProgressResource {
.build());
}
}
+
+ @GET
+ @Path("/tasks/subtask/progressStats")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation("Get task progress stats tracked for the given subtasks")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
+ })
+ public String getSubtaskProgressStats(
+ @ApiParam(value = "Sub task name") @QueryParam("subtaskName") String
subtaskName) {
+ try {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Getting progress stats for subtask: {}", subtaskName);
+ }
+ Map<String, MinionTaskBaseObserverStats> progressStatsMap = new
HashMap<>();
+ MinionEventObserver observer =
MinionEventObservers.getInstance().getMinionEventObserver(subtaskName);
+ if (observer == null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("MinionEventObserver does not exist for subtask: {}",
subtaskName);
+ }
+ return JsonUtils.objectToString(progressStatsMap);
+ }
+ MinionTaskBaseObserverStats progressStats = observer.getProgressStats();
+ if (progressStats != null) {
+ progressStats.setProgressLogs(null);
+ progressStatsMap.put(subtaskName, progressStats);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Got subtasks progress stats: {}", progressStats);
+ }
+ }
+ return JsonUtils.objectToString(progressStatsMap);
+ } catch (Exception e) {
+ throw new
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(
+ String.format("Failed to get task progress stats for subtask: %s due
to error: %s",
+ subtaskName, e.getMessage()))
+ .build());
+ }
+ }
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
index a68011207c..7f3b11870f 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
public class MinionProgressObserver extends DefaultMinionEventObserver {
private static final Logger LOGGER =
LoggerFactory.getLogger(MinionProgressObserver.class);
- protected MinionTaskBaseObserverStats _taskProgressStats = new
MinionTaskBaseObserverStats();
+ protected final MinionTaskBaseObserverStats _taskProgressStats = new
MinionTaskBaseObserverStats();
protected String _taskId;
@Override
@@ -50,6 +51,7 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
.withTs(_taskProgressStats.getStartTimestamp())
.withStatus("Task started")
+ .withStage(MinionTaskState.IN_PROGRESS.name())
.build());
super.notifyTaskStart(pinotTaskConfig);
}
@@ -62,14 +64,29 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
*/
@Override
public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig,
@Nullable Object progress) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Update progress: {} for task: {}", progress,
pinotTaskConfig.getTaskId());
- }
+ String progressMessage = null;
+ MinionTaskBaseObserverStats.StatusEntry statusEntry = null;
_taskProgressStats.setCurrentState(MinionTaskState.IN_PROGRESS.name());
- addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
- .withTs(System.currentTimeMillis())
- .withStatus((progress == null) ? "" : progress.toString())
- .build());
+ if (progress instanceof MinionTaskBaseObserverStats.StatusEntry) {
+ statusEntry = (MinionTaskBaseObserverStats.StatusEntry) progress;
+ progressMessage = statusEntry.getStatus();
+ } else if (progress instanceof MinionTaskBaseObserverStats) {
+ MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats)
progress;
+ // Only one progress log must be recorded at once and should not be
bulked
+ if (stats.getProgressLogs() != null) {
+ statusEntry = stats.getProgressLogs().pollFirst();
+ progressMessage = statusEntry != null ? statusEntry.getStatus() : null;
+ }
+ } else if (progress != null) {
+ progressMessage = progress.toString();
+ statusEntry = new MinionTaskBaseObserverStats.StatusEntry.Builder()
+ .withStatus(progressMessage)
+ .build();
+ }
+ if (LOGGER.isDebugEnabled() && progressMessage != null) {
+ LOGGER.debug("Update progress: {} for task: {}", progressMessage,
pinotTaskConfig.getTaskId());
+ }
+ addStatus(statusEntry);
super.notifyProgress(pinotTaskConfig, progress);
}
@@ -93,9 +110,11 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig,
@Nullable Object executionResult) {
long endTs = System.currentTimeMillis();
_taskProgressStats.setCurrentState(MinionTaskState.SUCCEEDED.name());
+ _taskProgressStats.setEndTimestamp(endTs);
addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
.withTs(endTs)
.withStatus("Task succeeded in " + (endTs -
_taskProgressStats.getStartTimestamp()) + "ms")
+ .withStage(MinionTaskState.SUCCEEDED.name())
.build());
super.notifyTaskSuccess(pinotTaskConfig, executionResult);
}
@@ -104,10 +123,12 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
public synchronized void notifyTaskCancelled(PinotTaskConfig
pinotTaskConfig) {
long endTs = System.currentTimeMillis();
_taskProgressStats.setCurrentState(MinionTaskState.CANCELLED.name());
+ _taskProgressStats.setEndTimestamp(endTs);
addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
.withTs(endTs)
.withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.WARN)
.withStatus("Task got cancelled after " + (endTs -
_taskProgressStats.getStartTimestamp()) + "ms")
+ .withStage(MinionTaskState.CANCELLED.name())
.build());
super.notifyTaskCancelled(pinotTaskConfig);
}
@@ -116,11 +137,13 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig,
Exception e) {
long endTs = System.currentTimeMillis();
_taskProgressStats.setCurrentState(MinionTaskState.ERROR.name());
+ _taskProgressStats.setEndTimestamp(endTs);
addStatus(new MinionTaskBaseObserverStats.StatusEntry.Builder()
.withTs(endTs)
.withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
.withStatus("Task failed in " + (endTs -
_taskProgressStats.getStartTimestamp()) + "ms with error: "
+ ExceptionUtils.getStackTrace(e))
+ .withStage(MinionTaskState.ERROR.name())
.build());
super.notifyTaskError(pinotTaskConfig, e);
}
@@ -139,6 +162,37 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
}
private synchronized void addStatus(MinionTaskBaseObserverStats.StatusEntry
statusEntry) {
+ if (statusEntry == null) {
+ // if no status entry provided, only update the task progress if the
_taskProgressStats has updated values
+ MinionTaskBaseObserverStats minionTaskObserverStats =
_observerStorageManager.getTaskProgress(_taskId);
+ if (minionTaskObserverStats != null &&
!minionTaskObserverStats.equals(_taskProgressStats)) {
+ _observerStorageManager.setTaskProgress(_taskId, new
MinionTaskBaseObserverStats(_taskProgressStats));
+ }
+ return;
+ }
+ String incomingStage = statusEntry.getStage();
+ if (_taskProgressStats.getCurrentStage() == null) {
+ // typically incomingStage won't be null when current stage is also null
as notifyTaskStart is the first
+ // that gets called during task execution.
+ // This handling is mostly for testing purpose
+ _taskProgressStats.setCurrentStage(incomingStage != null ? incomingStage
: MinionTaskState.UNKNOWN.name());
+ }
+ String currentStage = _taskProgressStats.getCurrentStage();
+ Map<String, MinionTaskBaseObserverStats.Timer> stageTimes =
_taskProgressStats.getStageTimes();
+ if (incomingStage != null && !currentStage.equals(incomingStage)) {
+ // stage transition
+ stageTimes.get(currentStage).stop();
+ currentStage = incomingStage;
+ _taskProgressStats.setCurrentStage(currentStage);
+ } else {
+ // carry forward current stage if stage not specified
+ statusEntry.updateStage(currentStage);
+ }
+ if (!stageTimes.containsKey(currentStage)) {
+ stageTimes.put(currentStage, new MinionTaskBaseObserverStats.Timer());
+ stageTimes.get(currentStage).start();
+ }
+
MinionTaskBaseObserverStats minionTaskObserverStats =
_observerStorageManager.getTaskProgress(_taskId);
Deque<MinionTaskBaseObserverStats.StatusEntry> progressLogs;
if (minionTaskObserverStats != null) {
@@ -146,8 +200,7 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
} else {
progressLogs = new LinkedList<>();
}
- progressLogs.add(statusEntry);
-
+ progressLogs.offer(statusEntry);
_observerStorageManager.setTaskProgress(_taskId, new
MinionTaskBaseObserverStats(_taskProgressStats)
.setProgressLogs(progressLogs));
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
index d21b35fb1c..37580cc08c 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStats.java
@@ -18,10 +18,14 @@
*/
package org.apache.pinot.spi.tasks;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.util.Deque;
+import java.util.HashMap;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Objects;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -34,8 +38,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
*/
public class MinionTaskBaseObserverStats {
protected String _taskId;
+ protected String _currentStage;
protected String _currentState;
protected long _startTimestamp;
+ protected long _endTimestamp;
+ // A map to keep track of the time spent on each stage of a task execution
+ // This stat will be managed by the observer and executor should not worry
about maintaining it
+ protected Map<String, Timer> _stageTimes = new HashMap<>();
protected Deque<StatusEntry> _progressLogs = new LinkedList<>();
public MinionTaskBaseObserverStats() {
@@ -44,7 +53,10 @@ public class MinionTaskBaseObserverStats {
public MinionTaskBaseObserverStats(MinionTaskBaseObserverStats from) {
_taskId = from.getTaskId();
_currentState = from.getCurrentState();
+ _currentStage = from.getCurrentStage();
_startTimestamp = from.getStartTimestamp();
+ _endTimestamp = from.getEndTimestamp();
+ _stageTimes = from.getStageTimes();
_progressLogs = new LinkedList<>(from.getProgressLogs());
}
@@ -66,6 +78,15 @@ public class MinionTaskBaseObserverStats {
return this;
}
+ public long getEndTimestamp() {
+ return _endTimestamp;
+ }
+
+ public MinionTaskBaseObserverStats setEndTimestamp(long endTimestamp) {
+ _endTimestamp = endTimestamp;
+ return this;
+ }
+
public String getCurrentState() {
return _currentState;
}
@@ -75,6 +96,24 @@ public class MinionTaskBaseObserverStats {
return this;
}
+ public String getCurrentStage() {
+ return _currentStage;
+ }
+
+ public MinionTaskBaseObserverStats setCurrentStage(String currentStage) {
+ _currentStage = currentStage;
+ return this;
+ }
+
+ public Map<String, Timer> getStageTimes() {
+ return _stageTimes;
+ }
+
+ public MinionTaskBaseObserverStats setStageTimes(Map<String, Timer>
stageTimes) {
+ _stageTimes = stageTimes;
+ return this;
+ }
+
public Deque<StatusEntry> getProgressLogs() {
return _progressLogs;
}
@@ -103,13 +142,56 @@ public class MinionTaskBaseObserverStats {
return false;
}
MinionTaskBaseObserverStats stats = (MinionTaskBaseObserverStats) o;
- return _startTimestamp == stats.getStartTimestamp() &&
_taskId.equals(stats.getTaskId())
- && _currentState.equals(stats.getCurrentState());
+ return _startTimestamp == stats.getStartTimestamp() && _endTimestamp ==
stats.getEndTimestamp()
+ && _taskId.equals(stats.getTaskId()) &&
_currentState.equals(stats.getCurrentState())
+ && _currentStage.equals(stats.getCurrentStage());
}
@Override
public int hashCode() {
- return Objects.hash(_taskId, _currentState, _startTimestamp);
+ return Objects.hash(_taskId, _currentStage, _currentState,
_startTimestamp, _endTimestamp);
+ }
+
+ public static class Timer {
+ private long _totalTimeMs = 0;
+ private long _startTimeMs = 0;
+ private long _resumeTimeMs = 0;
+
+ public Timer() {
+ }
+
+ public Timer(@JsonProperty("totalTimeMs") long totalTimeMs,
+ @JsonProperty("startTimeMs") long startTimeMs,
+ @JsonProperty("resumeTimeMs") long resumeTimeMs) {
+ _totalTimeMs = totalTimeMs;
+ _startTimeMs = startTimeMs;
+ _resumeTimeMs = resumeTimeMs;
+ }
+
+ public void start() {
+ _startTimeMs = System.currentTimeMillis();
+ _resumeTimeMs = _startTimeMs;
+ }
+
+ public void stop() {
+ if (_resumeTimeMs != 0) {
+ _totalTimeMs += System.currentTimeMillis() - _resumeTimeMs;
+ _resumeTimeMs = 0;
+ }
+ }
+
+ public long getStartTimeMs() {
+ return _startTimeMs;
+ }
+
+ public long getTotalTimeMs() {
+ return _totalTimeMs;
+ }
+
+ @JsonIgnore
+ public long getResumeTimeMs() {
+ return _resumeTimeMs;
+ }
}
@JsonDeserialize(builder = StatusEntry.Builder.class)
@@ -117,11 +199,13 @@ public class MinionTaskBaseObserverStats {
private final long _ts;
private final LogLevel _level;
private final String _status;
+ private String _stage;
- public StatusEntry(long ts, LogLevel level, String status) {
+ private StatusEntry(long ts, LogLevel level, String status, String stage) {
_ts = ts;
_level = level != null ? level : LogLevel.INFO;
_status = status;
+ _stage = stage;
}
public long getTs() {
@@ -136,14 +220,28 @@ public class MinionTaskBaseObserverStats {
return _level;
}
+ public String getStage() {
+ return _stage;
+ }
+
+ public boolean updateStage(String stage) {
+ if (_stage == null) {
+ _stage = stage;
+ return true;
+ }
+ return false;
+ }
+
@Override
public String toString() {
- return "{\"ts\" : " + _ts + ", \"level\" : \"" + _level + "\",
\"status\" : \"" + _status + "\"}";
+ return "{\"ts\" : " + _ts + ", \"level\" : \"" + _level + "\", \"stage\"
: \"" + _stage
+ + "\", \"status\" : \"" + _status + "\"}";
}
public static class Builder {
private long _ts;
private LogLevel _level = LogLevel.INFO;
+ private String _stage;
private String _status;
public Builder withTs(long ts) {
@@ -156,6 +254,11 @@ public class MinionTaskBaseObserverStats {
return this;
}
+ public Builder withStage(String stage) {
+ _stage = stage;
+ return this;
+ }
+
public Builder withStatus(String status) {
_status = status;
return this;
@@ -165,7 +268,7 @@ public class MinionTaskBaseObserverStats {
if (_ts == 0) {
_ts = System.currentTimeMillis();
}
- return new StatusEntry(_ts, _level, _status);
+ return new StatusEntry(_ts, _level, _status, _stage);
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
index 126e9d43e3..f96dc6dfb7 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/tasks/MinionTaskBaseObserverStatsTest.java
@@ -19,7 +19,13 @@
package org.apache.pinot.spi.tasks;
import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.File;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
+import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -29,28 +35,41 @@ public class MinionTaskBaseObserverStatsTest {
private static final String TEST_PROPERTY = "some test property";
private static final String TASK_ID = "randomString";
private static final String CURRENT_STATE = "IN_PROGRESS";
- private static final long TS = System.currentTimeMillis();
+ private static final String CURRENT_STAGE = "testStage";
+ private static final long TS = 1740407875728L;
private static final String STATUS = "task status";
@Test
public void testSerDeser()
- throws JsonProcessingException {
+ throws Exception {
+ Map<String, MinionTaskBaseObserverStats.Timer> stageTimes = new
HashMap<>();
+ stageTimes.put(CURRENT_STATE, new MinionTaskBaseObserverStats.Timer());
+ stageTimes.put(CURRENT_STAGE, new MinionTaskBaseObserverStats.Timer());
TestObserverStats stats = (TestObserverStats) new TestObserverStats()
.setTestProperty(TEST_PROPERTY)
.setTaskId(TASK_ID)
.setCurrentState(CURRENT_STATE)
- .setStartTimestamp(TS);
- stats.getProgressLogs().offer(new MinionTaskBaseObserverStats.StatusEntry(
- TS, MinionTaskBaseObserverStats.StatusEntry.LogLevel.INFO, STATUS));
+ .setCurrentStage(CURRENT_STAGE)
+ .setStartTimestamp(TS)
+ .setEndTimestamp(TS)
+ .setStageTimes(stageTimes);
+ stats.getProgressLogs().offer(new
MinionTaskBaseObserverStats.StatusEntry.Builder()
+ .withTs(TS)
+ .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.INFO)
+ .withStatus(STATUS)
+ .withStage("test")
+ .build());
String statsString = getTestObjectString();
TestObserverStats stats2 = stats.fromJsonString(statsString);
Assert.assertEquals(stats2, stats);
}
- private String getTestObjectString() {
- return "{\"testProperty\":\"" + TEST_PROPERTY + "\",\"startTimestamp\":" +
TS
- + ",\"currentState\":\"" + CURRENT_STATE +
"\",\"progressLogs\":[{\"level\":\"INFO\",\"status\":\"" + STATUS
- + "\",\"ts\":" + TS + "}],\"taskId\":\"" + TASK_ID + "\"}";
+ private String getTestObjectString()
+ throws Exception {
+ URL resource = MinionTaskBaseObserverStatsTest.class.getClassLoader()
+ .getResource("observer_stats_test_payload.json");
+ Assert.assertNotNull(resource);
+ return FileUtils.readFileToString(new File(resource.toURI()),
StandardCharsets.UTF_8);
}
public static class TestObserverStats extends MinionTaskBaseObserverStats {
@@ -74,8 +93,8 @@ public class MinionTaskBaseObserverStatsTest {
@Override
public boolean equals(Object o) {
if (this == o) {
- return true;
- }
+ return true;
+ }
if (o == null || getClass() != o.getClass()) {
return false;
}
@@ -85,7 +104,7 @@ public class MinionTaskBaseObserverStatsTest {
@Override
public int hashCode() {
- return Objects.hash(getTaskId(), getCurrentState(), getStartTimestamp(),
getTestProperty());
+ return Objects.hash(super.hashCode(), _testProperty);
}
}
}
diff --git a/pinot-spi/src/test/resources/observer_stats_test_payload.json
b/pinot-spi/src/test/resources/observer_stats_test_payload.json
new file mode 100644
index 0000000000..fcf968a903
--- /dev/null
+++ b/pinot-spi/src/test/resources/observer_stats_test_payload.json
@@ -0,0 +1,26 @@
+{
+ "testProperty": "some test property",
+ "currentState": "IN_PROGRESS",
+ "startTimestamp": 1740407875728,
+ "endTimestamp": 1740407875728,
+ "currentStage": "testStage",
+ "stageTimes": {
+ "IN_PROGRESS": {
+ "startTimeMs": 0,
+ "totalTimeMs": 0
+ },
+ "testStage": {
+ "startTimeMs": 0,
+ "totalTimeMs": 0
+ }
+ },
+ "progressLogs": [
+ {
+ "ts": 1740407875728,
+ "stage": "test",
+ "status": "task status",
+ "level": "INFO"
+ }
+ ],
+ "taskId": "randomString"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]