[FLINK-4720] Implement archived ExecutionGraph This closes #2577.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e8e2dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e8e2dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e8e2dc Branch: refs/heads/master Commit: 21e8e2dcf77f9d0dc3a74204626b776d87a9cd15 Parents: f6d8668 Author: zentol <ches...@apache.org> Authored: Thu Sep 22 14:02:22 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Fri Oct 14 13:55:28 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/api/common/Archiveable.java | 24 + .../api/common/ArchivedExecutionConfig.java | 73 ++++ .../flink/api/common/ExecutionConfig.java | 7 +- .../webmonitor/ExecutionGraphHolder.java | 9 +- .../AbstractExecutionGraphRequestHandler.java | 6 +- .../AbstractJobVertexRequestHandler.java | 10 +- .../AbstractSubtaskAttemptRequestHandler.java | 12 +- .../handlers/AbstractSubtaskRequestHandler.java | 10 +- .../handlers/JobAccumulatorsHandler.java | 4 +- .../handlers/JobCheckpointsHandler.java | 4 +- .../webmonitor/handlers/JobConfigHandler.java | 10 +- .../webmonitor/handlers/JobDetailsHandler.java | 13 +- .../handlers/JobExceptionsHandler.java | 19 +- .../webmonitor/handlers/JobPlanHandler.java | 4 +- .../handlers/JobVertexAccumulatorsHandler.java | 4 +- .../handlers/JobVertexBackPressureHandler.java | 9 +- .../handlers/JobVertexCheckpointsHandler.java | 42 +- .../handlers/JobVertexDetailsHandler.java | 10 +- .../handlers/JobVertexTaskManagersHandler.java | 21 +- .../SubtaskCurrentAttemptDetailsHandler.java | 4 +- ...taskExecutionAttemptAccumulatorsHandler.java | 6 +- .../SubtaskExecutionAttemptDetailsHandler.java | 6 +- .../SubtasksAllAccumulatorsHandler.java | 8 +- .../handlers/SubtasksTimesHandler.java | 10 +- .../BackPressureStatsTrackerITCase.java | 3 +- .../StackTraceSampleCoordinatorITCase.java | 3 +- .../JobVertexCheckpointsHandlerTest.java | 22 +- .../ArchivedCheckpointStatsTracker.java | 53 +++ .../checkpoint/stats/CheckpointStats.java | 4 +- .../checkpoint/stats/JobCheckpointStats.java | 3 +- .../stats/OperatorCheckpointStats.java | 2 + .../stats/SimpleCheckpointStatsTracker.java | 2 + .../runtime/executiongraph/AccessExecution.java | 105 +++++ .../executiongraph/AccessExecutionGraph.java | 161 +++++++ .../AccessExecutionJobVertex.java | 98 +++++ .../executiongraph/AccessExecutionVertex.java | 85 ++++ .../executiongraph/ArchivedExecution.java | 118 +++++ .../executiongraph/ArchivedExecutionGraph.java | 297 +++++++++++++ .../ArchivedExecutionJobVertex.java | 136 ++++++ .../executiongraph/ArchivedExecutionVertex.java | 96 ++++ .../flink/runtime/executiongraph/Execution.java | 41 +- .../runtime/executiongraph/ExecutionGraph.java | 154 ++++--- .../executiongraph/ExecutionJobVertex.java | 61 ++- .../runtime/executiongraph/ExecutionVertex.java | 50 +-- .../archive/ExecutionConfigSummary.java | 75 ---- .../runtime/webmonitor/WebMonitorUtils.java | 14 +- .../flink/runtime/jobmanager/JobManager.scala | 7 +- .../runtime/jobmanager/MemoryArchivist.scala | 4 +- .../runtime/messages/ArchiveMessages.scala | 12 +- .../runtime/messages/JobManagerMessages.scala | 4 +- .../checkpoint/CoordinatorShutdownTest.java | 4 +- .../ArchivedExecutionGraphTest.java | 434 +++++++++++++++++++ .../runtime/jobmanager/JobManagerTest.java | 2 +- .../LeaderChangeJobRecoveryTest.java | 2 +- .../TestingJobManagerMessages.scala | 4 +- .../flink/test/query/QueryableStateITCase.java | 14 +- 56 files changed, 2014 insertions(+), 381 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java new file mode 100644 index 0000000..09a3a0c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common; + +import java.io.Serializable; + +public interface Archiveable<T extends Serializable> { + T archive(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java new file mode 100644 index 0000000..faf920d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; + +/** + * Serializable class which is created when archiving the job. + * It can be used to display job information on the web interface + * without having to keep the classloader around after job completion. + */ +public class ArchivedExecutionConfig implements Serializable { + + private final String executionMode; + private final String restartStrategyDescription; + private final int parallelism; + private final boolean objectReuseEnabled; + private final Map<String, String> globalJobParameters; + + public ArchivedExecutionConfig(ExecutionConfig ec) { + executionMode = ec.getExecutionMode().name(); + if (ec.getRestartStrategy() != null) { + restartStrategyDescription = ec.getRestartStrategy().getDescription(); + } else { + restartStrategyDescription = "default"; + } + parallelism = ec.getParallelism(); + objectReuseEnabled = ec.isObjectReuseEnabled(); + if (ec.getGlobalJobParameters() != null + && ec.getGlobalJobParameters().toMap() != null) { + globalJobParameters = ec.getGlobalJobParameters().toMap(); + } else { + globalJobParameters = Collections.emptyMap(); + } + } + + public String getExecutionMode() { + return executionMode; + } + + public String getRestartStrategyDescription() { + return restartStrategyDescription; + } + + public int getParallelism() { + return parallelism; + } + + public boolean getObjectReuseEnabled() { + return objectReuseEnabled; + } + + public Map<String, String> getGlobalJobParameters() { + return globalJobParameters; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index aadf867..a0a63b1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -58,7 +58,7 @@ import java.util.Objects; * </ul> */ @Public -public class ExecutionConfig implements Serializable { +public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecutionConfig> { private static final long serialVersionUID = 1L; @@ -770,6 +770,11 @@ public class ExecutionConfig implements Serializable { public boolean canEqual(Object obj) { return obj instanceof ExecutionConfig; } + + @Override + public ArchivedExecutionConfig archive() { + return new ArchivedExecutionConfig(this); + } // ------------------------------ Utilities ---------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java index 7691874..3d0cfc0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -47,7 +48,7 @@ public class ExecutionGraphHolder { private final FiniteDuration timeout; - private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>(); + private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>(); public ExecutionGraphHolder() { this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT); @@ -63,8 +64,8 @@ public class ExecutionGraphHolder { * @param jid jobID of the execution graph to be retrieved * @return the retrieved execution graph or null if it is not retrievable */ - public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) { - ExecutionGraph cached = cache.get(jid); + public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway jobManager) { + AccessExecutionGraph cached = cache.get(jid); if (cached != null) { return cached; } @@ -78,7 +79,7 @@ public class ExecutionGraphHolder { return null; } else if (result instanceof JobManagerMessages.JobFound) { - ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph(); + AccessExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph(); cache.put(jid, eg); return eg; } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java index 16cfb1a..ff28d4e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.NotFoundException; @@ -53,7 +53,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage()); } - ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager); + AccessExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, jobManager); if (eg == null) { throw new NotFoundException("Could not find job with id " + jid); } @@ -61,5 +61,5 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan return handleRequest(eg, pathParams); } - public abstract String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception; + public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java index 5b12907..a36f94a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -36,7 +36,7 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG } @Override - public final String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { final String vidString = params.get("vertexid"); if (vidString == null) { throw new IllegalArgumentException("vertexId parameter missing"); @@ -50,7 +50,7 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG throw new IllegalArgumentException("Invalid JobVertexID string '" + vidString + "': " + e.getMessage()); } - final ExecutionJobVertex jobVertex = graph.getJobVertex(vid); + final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid); if (jobVertex == null) { throw new IllegalArgumentException("No vertex with ID '" + vidString + "' exists."); } @@ -58,5 +58,5 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG return handleRequest(jobVertex, params); } - public abstract String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception; + public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java index 672df16..f3a5059 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.util.Map; @@ -37,7 +37,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta } @Override - public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception { final String attemptNumberString = params.get("attempt"); if (attemptNumberString == null) { throw new RuntimeException("Attempt number parameter missing"); @@ -51,12 +51,12 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta throw new RuntimeException("Invalid attempt number parameter"); } - final Execution currentAttempt = vertex.getCurrentExecutionAttempt(); + final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt(); if (attempt == currentAttempt.getAttemptNumber()) { return handleRequest(currentAttempt, params); } else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) { - Execution exec = vertex.getPriorExecutionAttempt(attempt); + AccessExecution exec = vertex.getPriorExecutionAttempt(attempt); return handleRequest(exec, params); } else { @@ -64,5 +64,5 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta } } - public abstract String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception; + public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java index 90866c6..d6b279c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java @@ -18,8 +18,8 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.util.Map; @@ -36,7 +36,7 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq } @Override - public final String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { final String subtaskNumberString = params.get("subtasknum"); if (subtaskNumberString == null) { throw new RuntimeException("Subtask number parameter missing"); @@ -54,9 +54,9 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq throw new RuntimeException("subtask does not exist: " + subtask); } - final ExecutionVertex vertex = jobVertex.getTaskVertices()[subtask]; + final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask]; return handleRequest(vertex, params); } - public abstract String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception; + public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java index c5418b3..29613a0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.io.StringWriter; @@ -36,7 +36,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler } @Override - public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java index b63ab0e..404a14e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.stats.CheckpointStats; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import scala.Option; @@ -39,7 +39,7 @@ public class JobCheckpointsHandler extends AbstractExecutionGraphRequestHandler } @Override - public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 75389b1..21639ef 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -22,8 +22,8 @@ import java.io.StringWriter; import java.util.Map; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary; +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; /** @@ -36,7 +36,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -45,7 +45,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { gen.writeStringField("jid", graph.getJobID().toString()); gen.writeStringField("name", graph.getJobName()); - final ExecutionConfigSummary summary = graph.getExecutionConfigSummary(); + final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig(); if (summary != null) { gen.writeObjectFieldStart("execution-config"); @@ -59,7 +59,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { Map<String, String> ucVals = summary.getGlobalJobParameters(); if (ucVals != null) { gen.writeObjectFieldStart("user-config"); - + for (Map.Entry<String, String> ucVal : ucVals.entrySet()) { gen.writeStringField(ucVal.getKey(), ucVal.getValue()); } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 884b859..e7a2a8c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -24,9 +24,10 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -50,7 +51,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { final StringWriter writer = new StringWriter(); final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -84,13 +85,13 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { int[] jobVerticesPerState = new int[ExecutionState.values().length]; gen.writeArrayFieldStart("vertices"); - for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) { + for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) { int[] tasksPerState = new int[ExecutionState.values().length]; long startTime = Long.MAX_VALUE; long endTime = 0; boolean allFinished = true; - for (ExecutionVertex vertex : ejv.getTaskVertices()) { + for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { final ExecutionState state = vertex.getExecutionState(); tasksPerState[state.ordinal()]++; @@ -133,7 +134,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { gen.writeStartObject(); gen.writeStringField("id", ejv.getJobVertexId().toString()); - gen.writeStringField("name", ejv.getJobVertex().getName()); + gen.writeStringField("name", ejv.getName()); gen.writeNumberField("parallelism", ejv.getParallelism()); gen.writeStringField("status", jobVertexState.name()); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index ce154e3..90197d0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -19,11 +19,10 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.util.ExceptionUtils; import java.io.StringWriter; import java.util.Map; @@ -40,16 +39,16 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); // most important is the root failure cause - Throwable rootException = graph.getFailureCause(); + String rootException = graph.getFailureCauseAsString(); if (rootException != null) { - gen.writeStringField("root-exception", ExceptionUtils.stringifyException(rootException)); + gen.writeStringField("root-exception", rootException); } // we additionally collect all exceptions (up to a limit) that occurred in the individual tasks @@ -58,8 +57,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { int numExceptionsSoFar = 0; boolean truncated = false; - for (ExecutionVertex task : graph.getAllExecutionVertices()) { - Throwable t = task.getFailureCause(); + for (AccessExecutionVertex task : graph.getAllExecutionVertices()) { + String t = task.getFailureCauseAsString(); if (t != null) { if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) { truncated = true; @@ -71,8 +70,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)"; gen.writeStartObject(); - gen.writeStringField("exception", ExceptionUtils.stringifyException(t)); - gen.writeStringField("task", task.getSimpleName()); + gen.writeStringField("exception", t); + gen.writeStringField("task", task.getTaskNameWithSubtaskIndex()); gen.writeStringField("location", locationString); gen.writeEndObject(); numExceptionsSoFar++; http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java index 0389f5a..64f7000 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.util.Map; @@ -34,7 +34,7 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { } @Override - public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { return graph.getJsonPlan(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index 5df565a..ad4e207 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.io.StringWriter; @@ -35,7 +35,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle } @Override - public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java index 65f82a3..c5bacf2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -56,9 +58,12 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle @Override public String handleRequest( - ExecutionJobVertex jobVertex, + AccessExecutionJobVertex accessJobVertex, Map<String, String> params) throws Exception { - + if (accessJobVertex instanceof ArchivedExecutionJobVertex) { + return ""; + } + ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex; try (StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) { http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java index 6522de5..8a68ffa 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java @@ -19,9 +19,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import scala.Option; @@ -38,36 +37,31 @@ public class JobVertexCheckpointsHandler extends AbstractJobVertexRequestHandler } @Override - public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); - CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker(); + Option<OperatorCheckpointStats> statsOption = jobVertex.getCheckpointStats(); - if (tracker != null) { - Option<OperatorCheckpointStats> statsOption = tracker - .getOperatorStats(jobVertex.getJobVertexId()); + if (statsOption.isDefined()) { + OperatorCheckpointStats stats = statsOption.get(); - if (statsOption.isDefined()) { - OperatorCheckpointStats stats = statsOption.get(); + gen.writeNumberField("id", stats.getCheckpointId()); + gen.writeNumberField("timestamp", stats.getTriggerTimestamp()); + gen.writeNumberField("duration", stats.getDuration()); + gen.writeNumberField("size", stats.getStateSize()); + gen.writeNumberField("parallelism", stats.getNumberOfSubTasks()); - gen.writeNumberField("id", stats.getCheckpointId()); - gen.writeNumberField("timestamp", stats.getTriggerTimestamp()); - gen.writeNumberField("duration", stats.getDuration()); - gen.writeNumberField("size", stats.getStateSize()); - gen.writeNumberField("parallelism", stats.getNumberOfSubTasks()); - - gen.writeArrayFieldStart("subtasks"); - for (int i = 0; i < stats.getNumberOfSubTasks(); i++) { - gen.writeStartObject(); - gen.writeNumberField("subtask", i); - gen.writeNumberField("duration", stats.getSubTaskDuration(i)); - gen.writeNumberField("size", stats.getSubTaskStateSize(i)); - gen.writeEndObject(); - } - gen.writeEndArray(); + gen.writeArrayFieldStart("subtasks"); + for (int i = 0; i < stats.getNumberOfSubTasks(); i++) { + gen.writeStartObject(); + gen.writeNumberField("subtask", i); + gen.writeNumberField("duration", stats.getSubTaskDuration(i)); + gen.writeNumberField("size", stats.getSubTaskStateSize(i)); + gen.writeEndObject(); } + gen.writeEndArray(); } gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 813ecb8..fbdd86b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -24,8 +24,8 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -43,7 +43,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { } @Override - public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { final long now = System.currentTimeMillis(); StringWriter writer = new StringWriter(); @@ -52,13 +52,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeStringField("name", jobVertex.getJobVertex().getName()); + gen.writeStringField("name", jobVertex.getName()); gen.writeNumberField("parallelism", jobVertex.getParallelism()); gen.writeNumberField("now", now); gen.writeArrayFieldStart("subtasks"); int num = 0; - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { final ExecutionState status = vertex.getExecutionState(); TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index cbdb87f..0e94334 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -23,8 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -46,18 +47,18 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle } @Override - public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { // Build a map that groups tasks by TaskManager - Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>(); + Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort(); - List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager); + List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager); if (vertices == null) { - vertices = new ArrayList<ExecutionVertex>(); + vertices = new ArrayList<>(); taskManagerVertices.put(taskManager, vertices); } @@ -73,13 +74,13 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeStringField("name", jobVertex.getJobVertex().getName()); + gen.writeStringField("name", jobVertex.getName()); gen.writeNumberField("now", now); gen.writeArrayFieldStart("taskmanagers"); - for (Entry<String, List<ExecutionVertex>> entry : taskManagerVertices.entrySet()) { + for (Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) { String host = entry.getKey(); - List<ExecutionVertex> taskVertices = entry.getValue(); + List<AccessExecutionVertex> taskVertices = entry.getValue(); int[] tasksPerState = new int[ExecutionState.values().length]; @@ -92,7 +93,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle LongCounter tmReadRecords = new LongCounter(); LongCounter tmWriteRecords = new LongCounter(); - for (ExecutionVertex vertex : taskVertices) { + for (AccessExecutionVertex vertex : taskVertices) { final ExecutionState state = vertex.getExecutionState(); tasksPerState[state.ordinal()]++; http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java index d301bd1..811bea6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.util.Map; @@ -33,7 +33,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt } @Override - public String handleRequest(ExecutionVertex vertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception { return handleRequest(vertex.getCurrentExecutionAttempt(), params); } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index 14ccc0c..786f5e8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import java.io.StringWriter; @@ -38,7 +38,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA } @Override - public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); StringWriter writer = new StringWriter(); @@ -46,7 +46,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA gen.writeStartObject(); - gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex()); + gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); gen.writeStringField("id", execAttempt.getAttemptId().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index a1e6d0e..3cc7376 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -41,7 +41,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp } @Override - public String handleRequest(Execution execAttempt, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { final ExecutionState status = execAttempt.getState(); final long now = System.currentTimeMillis(); @@ -78,7 +78,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); - gen.writeNumberField("subtask", execAttempt.getVertex().getParallelSubtaskIndex()); + gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); gen.writeStringField("status", status.name()); gen.writeNumberField("attempt", execAttempt.getAttemptNumber()); gen.writeStringField("host", locationString); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 780bd4b..892a606 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -39,7 +39,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand } @Override - public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -50,7 +50,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand gen.writeArrayFieldStart("subtasks"); int num = 0; - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); String locationString = location == null ? "(unassigned)" : location.getHostname(); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index 9e6276d..76349ee 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -41,7 +41,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { } @Override - public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { final long now = System.currentTimeMillis(); StringWriter writer = new StringWriter(); @@ -50,13 +50,13 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); - gen.writeStringField("name", jobVertex.getJobVertex().getName()); + gen.writeStringField("name", jobVertex.getName()); gen.writeNumberField("now", now); gen.writeArrayFieldStart("subtasks"); int num = 0; - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps(); ExecutionState status = vertex.getExecutionState(); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 25dc189..507c977 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemoryType; @@ -153,7 +152,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger { ExecutionGraphFound executionGraphResponse = expectMsgClass(ExecutionGraphFound.class); - ExecutionGraph executionGraph = executionGraphResponse.executionGraph(); + ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph(); ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index 9b1f608..c4ce9d1 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -125,7 +124,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor); ExecutionGraphFound executionGraphResponse = expectMsgClass(ExecutionGraphFound.class); - ExecutionGraph executionGraph = executionGraphResponse.executionGraph(); + ExecutionGraph executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph(); ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID()); StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator( http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java index f882663..18aae35 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java @@ -20,9 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; @@ -37,8 +35,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,10 +45,9 @@ public class JobVertexCheckpointsHandlerTest { JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler( mock(ExecutionGraphHolder.class)); - ExecutionGraph graph = mock(ExecutionGraph.class); ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); - - when(vertex.getGraph()).thenReturn(graph); + when(vertex.getCheckpointStats()) + .thenReturn(Option.<OperatorCheckpointStats>empty()); String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap()); @@ -65,15 +60,10 @@ public class JobVertexCheckpointsHandlerTest { JobVertexCheckpointsHandler handler = new JobVertexCheckpointsHandler( mock(ExecutionGraphHolder.class)); - ExecutionGraph graph = mock(ExecutionGraph.class); ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); - - when(vertex.getGraph()).thenReturn(graph); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); // No stats - when(tracker.getOperatorStats(any(JobVertexID.class))) + when(vertex.getCheckpointStats()) .thenReturn(Option.<OperatorCheckpointStats>empty()); String response = handler.handleRequest(vertex, Collections.<String, String>emptyMap()); @@ -89,13 +79,9 @@ public class JobVertexCheckpointsHandlerTest { JobVertexID vertexId = new JobVertexID(); - ExecutionGraph graph = mock(ExecutionGraph.class); ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); - CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); when(vertex.getJobVertexId()).thenReturn(vertexId); - when(vertex.getGraph()).thenReturn(graph); - when(graph.getCheckpointStatsTracker()).thenReturn(tracker); long[][] subTaskStats = new long[][] { new long[] { 1, 10 }, @@ -113,7 +99,7 @@ public class JobVertexCheckpointsHandlerTest { OperatorCheckpointStats stats = new OperatorCheckpointStats( 3, 6812, 2800, 1024, subTaskStats); - when(tracker.getOperatorStats(eq(vertexId))) + when(vertex.getCheckpointStats()) .thenReturn(Option.apply(stats)); // Request stats http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java new file mode 100644 index 0000000..92df7d7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats; +import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import scala.Option; + +import java.io.Serializable; +import java.util.Map; + +public class ArchivedCheckpointStatsTracker implements CheckpointStatsTracker, Serializable { + private static final long serialVersionUID = 1469003563086353555L; + + private final Option<JobCheckpointStats> jobStats; + private final Map<JobVertexID, OperatorCheckpointStats> operatorStats; + + public ArchivedCheckpointStatsTracker(Option<JobCheckpointStats> jobStats, Map<JobVertexID, OperatorCheckpointStats> operatorStats) { + this.jobStats = jobStats; + this.operatorStats = operatorStats; + } + + @Override + public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) { + } + + @Override + public Option<JobCheckpointStats> getJobStats() { + return jobStats; + } + + @Override + public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID operatorId) { + return Option.apply(operatorStats.get(operatorId)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java index dc239dd..64f17d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.checkpoint.stats; +import java.io.Serializable; + /** * Statistics for a specific checkpoint. */ -public class CheckpointStats { +public class CheckpointStats implements Serializable { /** ID of the checkpoint. */ private final long checkpointId; http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java index b1d7ff2..e156c8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java @@ -20,12 +20,13 @@ package org.apache.flink.runtime.checkpoint.stats; import org.apache.flink.configuration.ConfigConstants; +import java.io.Serializable; import java.util.List; /** * Snapshot of checkpoint statistics for a job. */ -public interface JobCheckpointStats { +public interface JobCheckpointStats extends Serializable { // ------------------------------------------------------------------------ // General stats http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java index 5b113d8..6c2d497 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java @@ -27,6 +27,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class OperatorCheckpointStats extends CheckpointStats { + private static final long serialVersionUID = -1594736655739376140L; + /** Duration in milliseconds and state sizes in bytes per sub task. */ private final long[][] subTaskStats; http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java index db8a0e0..39fbad5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java @@ -348,6 +348,8 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { */ private static class JobCheckpointStatsSnapshot implements JobCheckpointStats { + private static final long serialVersionUID = 7558212015099742418L; + // General private final List<CheckpointStats> recentHistory; private final long count; http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java new file mode 100644 index 0000000..aefc17d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Map; + +/** + * Common interface for the runtime {@link Execution and {@link ArchivedExecution}. + */ +public interface AccessExecution { + /** + * Returns the {@link ExecutionAttemptID} for this Execution. + * + * @return ExecutionAttemptID for this execution + */ + ExecutionAttemptID getAttemptId(); + + /** + * Returns the attempt number for this execution. + * + * @return attempt number for this execution. + */ + int getAttemptNumber(); + + /** + * Returns the timestamps for every {@link ExecutionState}. + * + * @return timestamps for each state + */ + long[] getStateTimestamps(); + + /** + * Returns the current {@link ExecutionState} for this execution. + * + * @return execution state for this execution + */ + ExecutionState getState(); + + /** + * Returns the {@link TaskManagerLocation} for this execution. + * + * @return taskmanager location for this execution. + */ + TaskManagerLocation getAssignedResourceLocation(); + + /** + * Returns the exception that caused the job to fail. This is the first root exception + * that was not recoverable and triggered job failure. + * + * @return failure exception as a string, or {@code "(null)"} + */ + String getFailureCauseAsString(); + + /** + * Returns the timestamp for the given {@link ExecutionState}. + * + * @param state state for which the timestamp should be returned + * @return timestamp for the given state + */ + long getStateTimestamp(ExecutionState state); + + /** + * Returns the user-defined accumulators as strings. + * + * @return user-defined accumulators as strings. + */ + StringifiedAccumulatorResult[] getUserAccumulatorsStringified(); + + /** + * Returns the system-defined accumulators. + * + * @return system-defined accumulators. + * @deprecated Will be removed in FLINK-4527 + */ + @Deprecated + Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators(); + + /** + * Returns the subtask index of this execution. + * + * @return subtask index of this execution. + */ + int getParallelSubtaskIndex(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java new file mode 100644 index 0000000..0ff6ace --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; +import java.util.Map; + +/** + * Common interface for the runtime {@link ExecutionGraph} and {@link ArchivedExecutionGraph}. + */ +public interface AccessExecutionGraph { + /** + * Returns the job plan as a JSON string. + * + * @return job plan as a JSON string + */ + String getJsonPlan(); + + /** + * Returns the {@link JobID} for this execution graph. + * + * @return job ID for this execution graph + */ + JobID getJobID(); + + /** + * Returns the job name for thie execution graph. + * + * @return job name for this execution graph + */ + String getJobName(); + + /** + * Returns the current {@link JobStatus} for this execution graph. + * + * @return job status for this execution graph + */ + JobStatus getState(); + + /** + * Returns the exception that caused the job to fail. This is the first root exception + * that was not recoverable and triggered job failure. + * + * @return failure causing exception as a string, or {@code "(null)"} + */ + String getFailureCauseAsString(); + + /** + * Returns the job vertex for the given {@link JobVertexID}. + * + * @param id id of job vertex to be returned + * @return job vertex for the given id, or null + */ + AccessExecutionJobVertex getJobVertex(JobVertexID id); + + /** + * Returns a map containing all job vertices for this execution graph. + * + * @return map containing all job vertices for this execution graph + */ + Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices(); + + /** + * Returns an iterable containing all job vertices for this execution graph in the order they were created. + * + * @return iterable containing all job vertices for this execution graph in the order they were creater + */ + Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically(); + + /** + * Returns an iterable containing all execution vertices for this execution graph. + * + * @return iterable containing all execution vertices for this execution graph + */ + Iterable<? extends AccessExecutionVertex> getAllExecutionVertices(); + + /** + * Returns the timestamp for the given {@link JobStatus} + * + * @param status status for which the timestamp should be returned + * @return timestamp for the given job status + */ + long getStatusTimestamp(JobStatus status); + + /** + * Returns the {@link CheckpointStatsTracker} for this execution graph. + * + * @return CheckpointStatsTracker for thie execution graph + */ + CheckpointStatsTracker getCheckpointStatsTracker(); + + /** + * Returns the {@link ArchivedExecutionConfig} for this execution graph. + * + * @return execution config summary for this execution graph, or null in case of errors + */ + ArchivedExecutionConfig getArchivedExecutionConfig(); + + /** + * Returns whether the job for this execution graph is stoppable. + * + * @return true, if all sources tasks are stoppable, false otherwise + */ + boolean isStoppable(); + + /** + * Returns the aggregated user-defined accumulators as strings. + * + * @return aggregated user-defined accumulators as strings. + */ + StringifiedAccumulatorResult[] getAccumulatorResultsStringified(); + + /** + * Returns a map containing the serialized values of user-defined accumulators. + * + * @return map containing serialized values of user-defined accumulators + * @throws IOException indicates that the serialization has failed + */ + Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException; + + /** + * Returns the aggregated system-defined accumulators. + * + * @return aggregated system-defined accumulators. + * @deprecated Will be removed in FLINK-4527 + */ + @Deprecated + Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators(); + + /** + * Returns whether this execution graph was archived. + * + * @return true, if the execution graph was archived, false otherwise + */ + boolean isArchived(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java new file mode 100644 index 0000000..c9bf604 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import scala.Option; + +import java.util.Map; + +/** + * Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}. + */ +public interface AccessExecutionJobVertex { + /** + * Returns the name for this job vertex. + * + * @return name for this job vertex. + */ + String getName(); + + /** + * Returns the parallelism for this job vertex. + * + * @return parallelism for this job vertex. + */ + int getParallelism(); + + /** + * Returns the max parallelism for this job vertex. + * + * @return max parallelism for this job vertex. + */ + int getMaxParallelism(); + + /** + * Returns the {@link JobVertexID} for this job vertex. + * + * @return JobVertexID for this job vertex. + */ + JobVertexID getJobVertexId(); + + /** + * Returns all execution vertices for this job vertex. + * + * @return all execution vertices for this job vertex + */ + AccessExecutionVertex[] getTaskVertices(); + + /** + * Returns the aggregated {@link ExecutionState} for this job vertex. + * + * @return aggregated state for this job vertex + */ + ExecutionState getAggregateState(); + + /** + * Returns the {@link OperatorCheckpointStats} for this job vertex. + * + * @return checkpoint stats for this job vertex. + */ + Option<OperatorCheckpointStats> getCheckpointStats(); + + /** + * Returns the aggregated system-defined accumulators. + * + * @return aggregated system-defined accumulators. + * @deprecated Will be removed in FLINK-4527 + */ + @Deprecated + Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators(); + + /** + * Returns the aggregated user-defined accumulators as strings. + * + * @return aggregated user-defined accumulators as strings. + */ + StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified(); +}