This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a48fd53bd317ac2102adb00c1209350b57a687e Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jul 26 16:54:10 2022 +0800 [FLINK-28588][rest] MetricStore supports to store and query metrics of multiple execution attempts of a subtask. --- .../history/HistoryServerArchiveFetcher.java | 3 +- .../runtime/messages/webmonitor/JobDetails.java | 98 ++++++++++- .../metrics/dump/MetricDumpSerialization.java | 10 +- .../flink/runtime/metrics/dump/QueryScopeInfo.java | 28 +++- .../groups/InternalOperatorMetricGroup.java | 1 + .../runtime/metrics/groups/TaskMetricGroup.java | 5 +- .../handler/legacy/metrics/MetricFetcherImpl.java | 1 + .../rest/handler/legacy/metrics/MetricStore.java | 186 +++++++++++++++++---- .../rest/handler/util/MutableIOMetrics.java | 7 +- .../messages/webmonitor/JobDetailsTest.java | 31 ++++ .../metrics/dump/MetricDumpSerializerTest.java | 5 +- .../runtime/metrics/dump/QueryScopeInfoTest.java | 10 +- .../job/JobVertexBackPressureHandlerTest.java | 7 +- .../AggregatingSubtasksMetricsHandlerTest.java | 6 +- .../job/metrics/JobVertexMetricsHandlerTest.java | 4 +- .../job/metrics/SubtaskMetricsHandlerTest.java | 4 +- .../handler/legacy/metrics/MetricFetcherTest.java | 5 +- .../handler/legacy/metrics/MetricStoreTest.java | 61 ++++++- 18 files changed, 408 insertions(+), 64 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java index eb5a34b2c2d..8e5aee9c633 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java @@ -425,7 +425,8 @@ class HistoryServerArchiveFetcher { state, lastMod, tasksPerState, - numTasks); + numTasks, + new HashMap<>()); MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java index f9b609ac1cf..74b2964228c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.messages.webmonitor; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.execution.ExecutionState; @@ -39,6 +40,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S import java.io.IOException; import java.io.Serializable; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -57,6 +60,8 @@ public class JobDetails implements Serializable { private static final String FIELD_NAME_STATUS = "state"; private static final String FIELD_NAME_LAST_MODIFICATION = "last-modification"; private static final String FIELD_NAME_TOTAL_NUMBER_TASKS = "total"; + private static final String FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS = + "current-execution-attempts"; private final JobID jobId; @@ -76,6 +81,15 @@ public class JobDetails implements Serializable { private final int numTasks; + /** + * The map holds the attempt number of the current execution attempt in the Execution, which is + * considered as the representing execution for the subtask of the vertex. The keys and values + * are JobVertexID -> SubtaskIndex -> CurrentExecutionAttemptNumber. It is used to accumulate + * the metrics of a subtask in MetricFetcher. + */ + private final Map<String, Map<Integer, Integer>> currentExecutionAttempts; + + @VisibleForTesting public JobDetails( JobID jobId, String jobName, @@ -86,7 +100,30 @@ public class JobDetails implements Serializable { long lastUpdateTime, int[] tasksPerState, int numTasks) { + this( + jobId, + jobName, + startTime, + endTime, + duration, + status, + lastUpdateTime, + tasksPerState, + numTasks, + new HashMap<>()); + } + public JobDetails( + JobID jobId, + String jobName, + long startTime, + long endTime, + long duration, + JobStatus status, + long lastUpdateTime, + int[] tasksPerState, + int numTasks, + Map<String, Map<Integer, Integer>> currentExecutionAttempts) { this.jobId = checkNotNull(jobId); this.jobName = checkNotNull(jobName); this.startTime = startTime; @@ -100,6 +137,7 @@ public class JobDetails implements Serializable { ExecutionState.values().length); this.tasksPerState = checkNotNull(tasksPerState); this.numTasks = numTasks; + this.currentExecutionAttempts = checkNotNull(currentExecutionAttempts); } public static JobDetails createDetailsForJob(AccessExecutionGraph job) { @@ -112,16 +150,27 @@ public class JobDetails implements Serializable { int[] countsPerStatus = new int[ExecutionState.values().length]; long lastChanged = 0; int numTotalTasks = 0; + Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>(); for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) { AccessExecutionVertex[] taskVertices = ejv.getTaskVertices(); numTotalTasks += taskVertices.length; + Map<Integer, Integer> vertexAttempts = new HashMap<>(); for (AccessExecutionVertex taskVertex : taskVertices) { + if (taskVertex.getCurrentExecutions().size() > 1) { + vertexAttempts.put( + taskVertex.getParallelSubtaskIndex(), + taskVertex.getCurrentExecutionAttempt().getAttemptNumber()); + } ExecutionState state = taskVertex.getExecutionState(); countsPerStatus[state.ordinal()]++; lastChanged = Math.max(lastChanged, taskVertex.getStateTimestamp(state)); } + + if (!vertexAttempts.isEmpty()) { + currentExecutionAttempts.put(String.valueOf(ejv.getJobVertexId()), vertexAttempts); + } } lastChanged = Math.max(lastChanged, finished); @@ -135,7 +184,8 @@ public class JobDetails implements Serializable { status, lastChanged, countsPerStatus, - numTotalTasks); + numTotalTasks, + currentExecutionAttempts); } // ------------------------------------------------------------------------ @@ -176,6 +226,9 @@ public class JobDetails implements Serializable { return tasksPerState; } + public Map<String, Map<Integer, Integer>> getCurrentExecutionAttempts() { + return currentExecutionAttempts; + } // ------------------------------------------------------------------------ @Override @@ -192,7 +245,8 @@ public class JobDetails implements Serializable { && this.status == that.status && this.jobId.equals(that.jobId) && this.jobName.equals(that.jobName) - && Arrays.equals(this.tasksPerState, that.tasksPerState); + && Arrays.equals(this.tasksPerState, that.tasksPerState) + && this.currentExecutionAttempts.equals(that.currentExecutionAttempts); } else { return false; } @@ -208,6 +262,7 @@ public class JobDetails implements Serializable { result = 31 * result + (int) (lastUpdateTime ^ (lastUpdateTime >>> 32)); result = 31 * result + Arrays.hashCode(tasksPerState); result = 31 * result + numTasks; + result = 31 * result + currentExecutionAttempts.hashCode(); return result; } @@ -271,6 +326,20 @@ public class JobDetails implements Serializable { jsonGenerator.writeEndObject(); + if (!jobDetails.currentExecutionAttempts.isEmpty()) { + jsonGenerator.writeObjectFieldStart(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS); + for (Map.Entry<String, Map<Integer, Integer>> vertex : + jobDetails.currentExecutionAttempts.entrySet()) { + jsonGenerator.writeObjectFieldStart(vertex.getKey()); + for (Map.Entry<Integer, Integer> attempt : vertex.getValue().entrySet()) { + jsonGenerator.writeNumberField( + String.valueOf(attempt.getKey()), attempt.getValue()); + } + jsonGenerator.writeEndObject(); + } + jsonGenerator.writeEndObject(); + } + jsonGenerator.writeEndObject(); } } @@ -310,6 +379,28 @@ public class JobDetails implements Serializable { jsonNode == null ? 0 : jsonNode.intValue(); } + Map<String, Map<Integer, Integer>> attempts = new HashMap<>(); + JsonNode attemptsNode = rootNode.get(FIELD_NAME_CURRENT_EXECUTION_ATTEMPTS); + if (attemptsNode != null) { + attemptsNode + .fields() + .forEachRemaining( + vertex -> { + String vertexId = vertex.getKey(); + Map<Integer, Integer> vertexAttempts = + attempts.computeIfAbsent( + vertexId, k -> new HashMap<>()); + vertex.getValue() + .fields() + .forEachRemaining( + attempt -> + vertexAttempts.put( + Integer.parseInt( + attempt.getKey()), + attempt.getValue().intValue())); + }); + } + return new JobDetails( jobId, jobName, @@ -319,7 +410,8 @@ public class JobDetails implements Serializable { jobStatus, lastUpdateTime, numVerticesPerExecutionState, - numTasks); + numTasks, + attempts); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java index de423dadf9b..90187fed27f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java @@ -240,6 +240,7 @@ public class MetricDumpSerialization { out.writeUTF(taskInfo.jobID); out.writeUTF(taskInfo.vertexID); out.writeInt(taskInfo.subtaskIndex); + out.writeInt(taskInfo.attemptNumber); break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = @@ -247,6 +248,7 @@ public class MetricDumpSerialization { out.writeUTF(operatorInfo.jobID); out.writeUTF(operatorInfo.vertexID); out.writeInt(operatorInfo.subtaskIndex); + out.writeInt(operatorInfo.attemptNumber); out.writeUTF(operatorInfo.operatorName); break; default: @@ -436,6 +438,7 @@ public class MetricDumpSerialization { String jobID; String vertexID; int subtaskIndex; + int attemptNumber; String scope = dis.readUTF(); byte cat = dis.readByte(); @@ -452,14 +455,17 @@ public class MetricDumpSerialization { jobID = dis.readUTF(); vertexID = dis.readUTF(); subtaskIndex = dis.readInt(); - return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope); + attemptNumber = dis.readInt(); + return new QueryScopeInfo.TaskQueryScopeInfo( + jobID, vertexID, subtaskIndex, attemptNumber, scope); case INFO_CATEGORY_OPERATOR: jobID = dis.readUTF(); vertexID = dis.readUTF(); subtaskIndex = dis.readInt(); + attemptNumber = dis.readInt(); String operatorName = dis.readUTF(); return new QueryScopeInfo.OperatorQueryScopeInfo( - jobID, vertexID, subtaskIndex, operatorName, scope); + jobID, vertexID, subtaskIndex, attemptNumber, operatorName, scope); default: throw new IOException("Unknown scope category: " + cat); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java index c61a9d76412..d0d9652f627 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java @@ -142,22 +142,30 @@ public abstract class QueryScopeInfo { public final String jobID; public final String vertexID; public final int subtaskIndex; + public final int attemptNumber; - public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) { - this(jobID, vertexid, subtaskIndex, ""); + public TaskQueryScopeInfo( + String jobID, String vertexid, int subtaskIndex, int attemptNumber) { + this(jobID, vertexid, subtaskIndex, attemptNumber, ""); } - public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) { + public TaskQueryScopeInfo( + String jobID, String vertexid, int subtaskIndex, int attemptNumber, String scope) { super(scope); this.jobID = jobID; this.vertexID = vertexid; this.subtaskIndex = subtaskIndex; + this.attemptNumber = attemptNumber; } @Override public TaskQueryScopeInfo copy(String additionalScope) { return new TaskQueryScopeInfo( - this.jobID, this.vertexID, this.subtaskIndex, concatScopes(additionalScope)); + this.jobID, + this.vertexID, + this.subtaskIndex, + this.attemptNumber, + concatScopes(additionalScope)); } @Override @@ -174,23 +182,30 @@ public abstract class QueryScopeInfo { public final String jobID; public final String vertexID; public final int subtaskIndex; + public final int attemptNumber; public final String operatorName; public OperatorQueryScopeInfo( - String jobID, String vertexid, int subtaskIndex, String operatorName) { - this(jobID, vertexid, subtaskIndex, operatorName, ""); + String jobID, + String vertexid, + int subtaskIndex, + int attemptNumber, + String operatorName) { + this(jobID, vertexid, subtaskIndex, attemptNumber, operatorName, ""); } public OperatorQueryScopeInfo( String jobID, String vertexid, int subtaskIndex, + int attemptNumber, String operatorName, String scope) { super(scope); this.jobID = jobID; this.vertexID = vertexid; this.subtaskIndex = subtaskIndex; + this.attemptNumber = attemptNumber; this.operatorName = operatorName; } @@ -200,6 +215,7 @@ public abstract class QueryScopeInfo { this.jobID, this.vertexID, this.subtaskIndex, + this.attemptNumber, this.operatorName, concatScopes(additionalScope)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java index 68a85ba5f2f..d075675b80c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorMetricGroup.java @@ -75,6 +75,7 @@ public class InternalOperatorMetricGroup extends ComponentMetricGroup<TaskMetric this.parent.parent.jobId.toString(), this.parent.vertexId.toString(), this.parent.subtaskIndex, + this.parent.attemptNumber(), filter.filterCharacters(this.operatorName)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 0dc0ff9b6c5..afcbbaa44bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -134,7 +134,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo( CharacterFilter filter) { return new QueryScopeInfo.TaskQueryScopeInfo( - this.parent.jobId.toString(), String.valueOf(this.vertexId), this.subtaskIndex); + this.parent.jobId.toString(), + String.valueOf(this.vertexId), + this.subtaskIndex, + this.attemptNumber); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java index beb652f97c7..65d2e162532 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java @@ -143,6 +143,7 @@ public class MetricFetcherImpl<T extends RestfulGateway> implements MetricFetche toRetain.add(job.getJobId().toString()); } metrics.retainJobs(toRetain); + metrics.updateCurrentExecutionAttempts(jobDetails.getJobs()); } }, executor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index 93f289fa036..c86ac763f9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; @@ -27,8 +28,10 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.ThreadSafe; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -54,6 +57,15 @@ public class MetricStore { private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>(); private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>(); + /** + * The map holds the attempt number of the representing execution for each subtask of each + * vertex. The keys and values are JobID -> JobVertexID -> SubtaskIndex -> + * CurrentExecutionAttemptNumber. When a metric of an execution attempt is added, the metric can + * also be added to the SubtaskMetricStore when it is of the representing execution. + */ + private final Map<String, Map<String, Map<Integer, Integer>>> currentExecutionAttempts = + new ConcurrentHashMap<>(); + /** * Remove inactive task managers. * @@ -70,6 +82,18 @@ public class MetricStore { */ synchronized void retainJobs(List<String> activeJobs) { jobs.keySet().retainAll(activeJobs); + currentExecutionAttempts.keySet().retainAll(activeJobs); + } + + public synchronized void updateCurrentExecutionAttempts(Collection<JobDetails> jobs) { + jobs.forEach( + job -> + currentExecutionAttempts.put( + job.getJobId().toString(), job.getCurrentExecutionAttempts())); + } + + public Map<String, Map<String, Map<Integer, Integer>>> getCurrentExecutionAttempts() { + return currentExecutionAttempts; } /** @@ -153,7 +177,24 @@ public class MetricStore { if (task == null) { return null; } - return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex)); + return SubtaskMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex)); + } + + public synchronized ComponentMetricStore getSubtaskAttemptMetricStore( + String jobID, String taskID, int subtaskIndex, int attemptNumber) { + JobMetricStore job = jobID == null ? null : jobs.get(jobID); + if (job == null) { + return null; + } + TaskMetricStore task = job.getTaskMetricStore(taskID); + if (task == null) { + return null; + } + SubtaskMetricStore subtask = task.getSubtaskMetricStore(subtaskIndex); + if (subtask == null) { + return null; + } + return ComponentMetricStore.unmodifiable(subtask.getAttemptsMetricStore(attemptNumber)); } public synchronized Map<String, JobMetricStore> getJobs() { @@ -177,7 +218,9 @@ public class MetricStore { TaskManagerMetricStore tm; JobMetricStore job; TaskMetricStore task; - ComponentMetricStore subtask; + SubtaskMetricStore subtask; + ComponentMetricStore attempt; + boolean isRepresentativeAttempt; String name = info.scope.isEmpty() ? metric.name : info.scope + "." + metric.name; @@ -214,15 +257,34 @@ public class MetricStore { task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore()); subtask = task.subtasks.computeIfAbsent( - taskInfo.subtaskIndex, k -> new ComponentMetricStore()); - /** - * The duplication is intended. Metrics scoped by subtask are useful for several - * job/task handlers, while the WebInterface task metric queries currently do - * not account for subtasks, so we don't divide by subtask and instead use the - * concatenation of subtask index and metric name as the name for those. - */ - addMetric(subtask.metrics, name, metric); - addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); + taskInfo.subtaskIndex, k -> new SubtaskMetricStore()); + + // The attempt is the representative one if the current execution attempt + // number for the subtask is not present in the currentExecutionAttempts, + // which means there should be only one execution + isRepresentativeAttempt = + isRepresentativeAttempt( + taskInfo.jobID, + taskInfo.vertexID, + taskInfo.subtaskIndex, + taskInfo.attemptNumber); + attempt = + subtask.attempts.computeIfAbsent( + taskInfo.attemptNumber, k -> new ComponentMetricStore()); + addMetric(attempt.metrics, name, metric); + // If the attempt is representative one, its metrics can be updated to the + // subtask and task metric store. + if (isRepresentativeAttempt) { + /** + * The duplication is intended. Metrics scoped by subtask are useful for + * several job/task handlers, while the WebInterface task metric queries + * currently do not account for subtasks, so we don't divide by subtask and + * instead use the concatenation of subtask index and metric name as the + * name for those. + */ + addMetric(subtask.metrics, name, metric); + addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); + } break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = @@ -233,21 +295,38 @@ public class MetricStore { operatorInfo.vertexID, k -> new TaskMetricStore()); subtask = task.subtasks.computeIfAbsent( - operatorInfo.subtaskIndex, k -> new ComponentMetricStore()); - /** - * As the WebInterface does not account for operators (because it can't) we - * don't divide by operator and instead use the concatenation of subtask index, - * operator name and metric name as the name. - */ - addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric); - addMetric( - task.metrics, - operatorInfo.subtaskIndex - + "." - + operatorInfo.operatorName - + "." - + name, - metric); + operatorInfo.subtaskIndex, k -> new SubtaskMetricStore()); + + isRepresentativeAttempt = + isRepresentativeAttempt( + operatorInfo.jobID, + operatorInfo.vertexID, + operatorInfo.subtaskIndex, + operatorInfo.attemptNumber); + + attempt = + subtask.attempts.computeIfAbsent( + operatorInfo.attemptNumber, k -> new ComponentMetricStore()); + addMetric(attempt.metrics, operatorInfo.operatorName + "." + name, metric); + + // If the attempt is representative one, its metrics can be updated to the + // subtask and task metric store. + if (isRepresentativeAttempt) { + /** + * As the WebInterface does not account for operators (because it can't) we + * don't divide by operator and instead use the concatenation of subtask + * index, operator name and metric name as the name. + */ + addMetric(subtask.metrics, operatorInfo.operatorName + "." + name, metric); + addMetric( + task.metrics, + operatorInfo.subtaskIndex + + "." + + operatorInfo.operatorName + + "." + + name, + metric); + } break; default: LOG.debug("Invalid metric dump category: " + info.getCategory()); @@ -257,6 +336,19 @@ public class MetricStore { } } + // Returns whether the attempt is the representative one. It's also true if the current + // execution attempt number for the subtask is not present in the currentExecutionAttempts, + // which means there should be only one execution + private boolean isRepresentativeAttempt( + String jobID, String vertexID, int subtaskIndex, int attemptNumber) { + return Optional.of(currentExecutionAttempts) + .map(m -> m.get(jobID)) + .map(m -> m.get(vertexID)) + .map(m -> m.get(subtaskIndex)) + .orElse(attemptNumber) + == attemptNumber; + } + private void addMetric(Map<String, String> target, String name, MetricDump metric) { switch (metric.getCategory()) { case METRIC_CATEGORY_COUNTER: @@ -363,24 +455,24 @@ public class MetricStore { /** Sub-structure containing metrics of a single Task. */ @ThreadSafe public static class TaskMetricStore extends ComponentMetricStore { - private final Map<Integer, ComponentMetricStore> subtasks; + private final Map<Integer, SubtaskMetricStore> subtasks; private TaskMetricStore() { this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>()); } private TaskMetricStore( - Map<String, String> metrics, Map<Integer, ComponentMetricStore> subtasks) { + Map<String, String> metrics, Map<Integer, SubtaskMetricStore> subtasks) { super(metrics); this.subtasks = checkNotNull(subtasks); } - public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) { + public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) { return subtasks.get(subtaskIndex); } - public Map<Integer, ComponentMetricStore> getAllSubtaskMetricStores() { - return subtasks; + public Map<Integer, SubtaskMetricStore> getAllSubtaskMetricStores() { + return unmodifiableMap(subtasks); } private static TaskMetricStore unmodifiable(TaskMetricStore source) { @@ -391,4 +483,36 @@ public class MetricStore { unmodifiableMap(source.metrics), unmodifiableMap(source.subtasks)); } } + + /** Sub-structure containing metrics of a single subtask. */ + @ThreadSafe + public static class SubtaskMetricStore extends ComponentMetricStore { + private final Map<Integer, ComponentMetricStore> attempts; + + private SubtaskMetricStore() { + this(new ConcurrentHashMap<>(), new ConcurrentHashMap<>()); + } + + private SubtaskMetricStore( + Map<String, String> metrics, Map<Integer, ComponentMetricStore> attempts) { + super(metrics); + this.attempts = checkNotNull(attempts); + } + + public ComponentMetricStore getAttemptsMetricStore(int attemptNumber) { + return attempts.get(attemptNumber); + } + + public Map<Integer, ComponentMetricStore> getAllAttemptsMetricStores() { + return unmodifiableMap(attempts); + } + + private static SubtaskMetricStore unmodifiable(SubtaskMetricStore source) { + if (source == null) { + return null; + } + return new SubtaskMetricStore( + unmodifiableMap(source.metrics), unmodifiableMap(source.attempts)); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 7da9061a090..8ba34a10d85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -99,8 +99,11 @@ public class MutableIOMetrics extends IOMetrics { fetcher.update(); MetricStore.ComponentMetricStore metrics = fetcher.getMetricStore() - .getSubtaskMetricStore( - jobID, taskID, attempt.getParallelSubtaskIndex()); + .getSubtaskAttemptMetricStore( + jobID, + taskID, + attempt.getParallelSubtaskIndex(), + attempt.getAttemptNumber()); if (metrics != null) { /** * We want to keep track of missing metrics to be able to make a difference diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java index 790ca43ce7d..609ef1f8e0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java @@ -29,6 +29,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -102,4 +104,33 @@ class JobDetailsTest { assertThat(unmarshalled).isEqualTo(expected); } + + @Test + void testJobDetailsWithExecutionAttemptsMarshalling() throws JsonProcessingException { + Map<String, Map<Integer, Integer>> currentExecutionAttempts = new HashMap<>(); + currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(1, 2); + currentExecutionAttempts.computeIfAbsent("a", k -> new HashMap<>()).put(2, 4); + currentExecutionAttempts.computeIfAbsent("b", k -> new HashMap<>()).put(3, 1); + + final JobDetails expected = + new JobDetails( + new JobID(), + "foobar", + 1L, + 10L, + 9L, + JobStatus.RUNNING, + 8L, + new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3}, + 42, + currentExecutionAttempts); + + final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); + + final JsonNode marshalled = objectMapper.valueToTree(expected); + + final JobDetails unmarshalled = objectMapper.treeToValue(marshalled, JobDetails.class); + + assertThat(unmarshalled).isEqualTo(expected); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java index 52eec21ab5d..2876cf1635a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java @@ -159,11 +159,12 @@ class MetricDumpSerializerTest { gauges.put( g1, new Tuple2<QueryScopeInfo, String>( - new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1")); + new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, 0, "D"), "g1")); histograms.put( h1, new Tuple2<QueryScopeInfo, String>( - new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), + new QueryScopeInfo.OperatorQueryScopeInfo( + "jid", "vid", 2, 0, "opname", "E"), "h1")); MetricDumpSerialization.MetricSerializationResult serialized = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java index 0355d01fcf0..7380cd004b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java @@ -94,7 +94,7 @@ class QueryScopeInfoTest { @Test void testTaskQueryScopeInfo() { QueryScopeInfo.TaskQueryScopeInfo info = - new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2); + new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0); assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK); assertThat(info.scope).isEmpty(); assertThat(info.jobID).isEqualTo("jobid"); @@ -108,7 +108,7 @@ class QueryScopeInfoTest { assertThat(info.vertexID).isEqualTo("taskid"); assertThat(info.subtaskIndex).isEqualTo(2); - info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello"); + info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, 0, "hello"); assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_TASK); assertThat(info.scope).isEqualTo("hello"); assertThat(info.jobID).isEqualTo("jobid"); @@ -126,7 +126,7 @@ class QueryScopeInfoTest { @Test void testOperatorQueryScopeInfo() { QueryScopeInfo.OperatorQueryScopeInfo info = - new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname"); + new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, 0, "opname"); assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR); assertThat(info.scope).isEmpty(); assertThat(info.jobID).isEqualTo("jobid"); @@ -142,7 +142,9 @@ class QueryScopeInfoTest { assertThat(info.operatorName).isEqualTo("opname"); assertThat(info.subtaskIndex).isEqualTo(2); - info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello"); + info = + new QueryScopeInfo.OperatorQueryScopeInfo( + "jobid", "taskid", 2, 0, "opname", "hello"); assertThat(info.getCategory()).isEqualTo(QueryScopeInfo.INFO_CATEGORY_OPERATOR); assertThat(info.scope).isEqualTo("hello"); assertThat(info.jobID).isEqualTo("jobid"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java index 77cd5047d3e..85bf3a44cd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobVertexBackPressureHandlerTest.java @@ -77,6 +77,7 @@ class JobVertexBackPressureHandlerTest { new TaskQueryScopeInfo( TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), + 0, 0); dumps.add(new GaugeDump(task0, MetricNames.TASK_BACK_PRESSURED_TIME, "1000")); dumps.add(new GaugeDump(task0, MetricNames.TASK_IDLE_TIME, "0")); @@ -86,7 +87,8 @@ class JobVertexBackPressureHandlerTest { new TaskQueryScopeInfo( TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), - 1); + 1, + 0); dumps.add(new GaugeDump(task1, MetricNames.TASK_BACK_PRESSURED_TIME, "500")); dumps.add(new GaugeDump(task1, MetricNames.TASK_IDLE_TIME, "100")); dumps.add(new GaugeDump(task1, MetricNames.TASK_BUSY_TIME, "900")); @@ -97,7 +99,8 @@ class JobVertexBackPressureHandlerTest { new TaskQueryScopeInfo( TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), - 3); + 3, + 0); dumps.add(new GaugeDump(task3, MetricNames.TASK_BACK_PRESSURED_TIME, "100")); dumps.add(new GaugeDump(task3, MetricNames.TASK_IDLE_TIME, "200")); dumps.add(new GaugeDump(task3, MetricNames.TASK_BUSY_TIME, "700")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java index b04020ab411..ed13c7da040 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingSubtasksMetricsHandlerTest.java @@ -65,19 +65,19 @@ public class AggregatingSubtasksMetricsHandlerTest Collection<MetricDump> dumps = new ArrayList<>(3); QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo( - JOB_ID.toString(), TASK_ID.toString(), 1, "abc"); + JOB_ID.toString(), TASK_ID.toString(), 1, 0, "abc"); MetricDump.CounterDump cd1 = new MetricDump.CounterDump(task1, "metric1", 1); dumps.add(cd1); QueryScopeInfo.TaskQueryScopeInfo task2 = new QueryScopeInfo.TaskQueryScopeInfo( - JOB_ID.toString(), TASK_ID.toString(), 2, "abc"); + JOB_ID.toString(), TASK_ID.toString(), 2, 0, "abc"); MetricDump.CounterDump cd2 = new MetricDump.CounterDump(task2, "metric1", 3); dumps.add(cd2); QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo( - JOB_ID.toString(), TASK_ID.toString(), 3, "abc"); + JOB_ID.toString(), TASK_ID.toString(), 3, 0, "abc"); MetricDump.CounterDump cd3 = new MetricDump.CounterDump(task3, "metric2", 5); dumps.add(cd3); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java index 66ea8c22c00..b48dbbac38f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandlerTest.java @@ -34,6 +34,8 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte private static final int TEST_SUBTASK_INDEX = 1; + private static final int TEST_ATTEMPT_NUMBER = 0; + @Override JobVertexMetricsHandler getMetricsHandler() { return new JobVertexMetricsHandler( @@ -43,7 +45,7 @@ public class JobVertexMetricsHandlerTest extends MetricsHandlerTestBase<JobVerte @Override QueryScopeInfo getQueryScopeInfo() { return new QueryScopeInfo.TaskQueryScopeInfo( - TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX); + TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java index f59569c1d12..30559e56aee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java @@ -37,6 +37,8 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet private static final int TEST_SUBTASK_INDEX = 0; + private static final int TEST_ATTEMPT_NUMBER = 0; + @Override SubtaskMetricsHandler getMetricsHandler() { return new SubtaskMetricsHandler(leaderRetriever, TIMEOUT, TEST_HEADERS, mockMetricFetcher); @@ -45,7 +47,7 @@ public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMet @Override QueryScopeInfo getQueryScopeInfo() { return new QueryScopeInfo.TaskQueryScopeInfo( - TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX); + TEST_JOB_ID, TEST_VERTEX_ID, TEST_SUBTASK_INDEX, TEST_ATTEMPT_NUMBER); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index 9dc014d8b50..a7259988672 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -177,12 +177,13 @@ public class MetricFetcherTest extends TestLogger { c1, new Tuple2<>( new QueryScopeInfo.OperatorQueryScopeInfo( - jobID.toString(), "taskid", 2, "opname", "abc"), + jobID.toString(), "taskid", 2, 0, "opname", "abc"), "oc")); counters.put( c2, new Tuple2<>( - new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), + new QueryScopeInfo.TaskQueryScopeInfo( + jobID.toString(), "taskid", 2, 0, "abc"), "tc")); meters.put( new Meter() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index 97c739df224..c54f50291fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -24,6 +24,9 @@ import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; @@ -41,7 +44,18 @@ class MetricStoreTest { assertThat(store.getJobMetricStore("jobid").getMetric("abc.metric4", "-1")).isEqualTo("3"); assertThat(store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric5", "-1")) + .isEqualTo("14"); + assertThat(store.getSubtaskMetricStore("jobid", "taskid", 8).getMetric("abc.metric5", "-1")) + .isEqualTo("14"); + assertThat( + store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 1) + .getMetric("abc.metric5", "-1")) .isEqualTo("4"); + assertThat( + store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2) + .getMetric("abc.metric5", "-1")) + .isEqualTo("14"); + assertThat( store.getTaskMetricStore("jobid", "taskid") .getMetric("8.opname.abc.metric6", "-1")) @@ -50,6 +64,27 @@ class MetricStoreTest { store.getTaskMetricStore("jobid", "taskid") .getMetric("8.opname.abc.metric7", "-1")) .isEqualTo("6"); + assertThat( + store.getTaskMetricStore("jobid", "taskid") + .getMetric("1.opname.abc.metric7", "-1")) + .isEqualTo("6"); + assertThat( + store.getSubtaskMetricStore("jobid", "taskid", 1) + .getMetric("opname.abc.metric7", "-1")) + .isEqualTo("6"); + assertThat(store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 2)).isNull(); + assertThat( + store.getSubtaskAttemptMetricStore("jobid", "taskid", 1, 3) + .getMetric("opname.abc.metric7", "-1")) + .isEqualTo("6"); + assertThat( + store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 2) + .getMetric("opname.abc.metric7", "-1")) + .isEqualTo("6"); + assertThat( + store.getSubtaskAttemptMetricStore("jobid", "taskid", 8, 4) + .getMetric("opname.abc.metric7", "-1")) + .isEqualTo("16"); } @Test @@ -72,6 +107,11 @@ class MetricStoreTest { } static MetricStore setupStore(MetricStore store) { + Map<Integer, Integer> currentExecutionAttempts = new HashMap<>(); + currentExecutionAttempts.put(8, 2); + store.getCurrentExecutionAttempts() + .put("jobid", Collections.singletonMap("taskid", currentExecutionAttempts)); + QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc"); MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0); @@ -96,19 +136,30 @@ class MetricStoreTest { MetricDump.CounterDump cd42 = new MetricDump.CounterDump(job2, "metric4", 3); QueryScopeInfo.TaskQueryScopeInfo task = - new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc"); + new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 1, "abc"); MetricDump.CounterDump cd5 = new MetricDump.CounterDump(task, "metric5", 4); + QueryScopeInfo.TaskQueryScopeInfo speculativeTask = + new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, 2, "abc"); + MetricDump.CounterDump cd52 = new MetricDump.CounterDump(speculativeTask, "metric5", 14); + QueryScopeInfo.OperatorQueryScopeInfo operator = - new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc"); + new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 2, "opname", "abc"); MetricDump.CounterDump cd6 = new MetricDump.CounterDump(operator, "metric6", 5); MetricDump.CounterDump cd7 = new MetricDump.CounterDump(operator, "metric7", 6); QueryScopeInfo.OperatorQueryScopeInfo operator2 = - new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, "opname", "abc"); + new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 1, 3, "opname", "abc"); MetricDump.CounterDump cd62 = new MetricDump.CounterDump(operator2, "metric6", 5); MetricDump.CounterDump cd72 = new MetricDump.CounterDump(operator2, "metric7", 6); + QueryScopeInfo.OperatorQueryScopeInfo speculativeOperator2 = + new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, 4, "opname", "abc"); + MetricDump.CounterDump cd63 = + new MetricDump.CounterDump(speculativeOperator2, "metric6", 15); + MetricDump.CounterDump cd73 = + new MetricDump.CounterDump(speculativeOperator2, "metric7", 16); + store.add(cd1); store.add(cd2); store.add(cd2a); @@ -125,6 +176,10 @@ class MetricStoreTest { store.add(cd32); store.add(cd42); + store.add(cd52); + store.add(cd63); + store.add(cd73); + return store; } }