http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java new file mode 100644 index 0000000..9faf3fb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +/** + * Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}. + */ +public interface AccessExecutionVertex { + /** + * Returns the name of this execution vertex in the format "myTask (2/7)". + * + * @return name of this execution vertex + */ + String getTaskNameWithSubtaskIndex(); + + /** + * Returns the subtask index of this execution vertex. + * + * @return subtask index of this execution vertex. + */ + int getParallelSubtaskIndex(); + + /** + * Returns the current execution for this execution vertex. + * + * @return current execution + */ + AccessExecution getCurrentExecutionAttempt(); + + /** + * Returns the current {@link ExecutionState} for this execution vertex. + * + * @return execution state for this execution vertex + */ + ExecutionState getExecutionState(); + + /** + * Returns the timestamp for the given {@link ExecutionState}. + * + * @param state state for which the timestamp should be returned + * @return timestamp for the given state + */ + long getStateTimestamp(ExecutionState state); + + /** + * Returns the exception that caused the job to fail. This is the first root exception + * that was not recoverable and triggered job failure. + * + * @return failure exception as a string, or {@code "(null)"} + */ + String getFailureCauseAsString(); + + /** + * Returns the {@link TaskManagerLocation} for this execution vertex. + * + * @return taskmanager location for this execution vertex. + */ + TaskManagerLocation getCurrentAssignedResourceLocation(); + + /** + * Returns the execution for the given attempt number. + * + * @param attemptNumber attempt number of execution to be returned + * @return execution for the given attempt number + */ + AccessExecution getPriorExecutionAttempt(int attemptNumber); +}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java new file mode 100644 index 0000000..0b2992f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.ExceptionUtils; + +import java.io.Serializable; +import java.util.Map; + +public class ArchivedExecution implements AccessExecution, Serializable { + private static final long serialVersionUID = 4817108757483345173L; + // -------------------------------------------------------------------------------------------- + + private final ExecutionAttemptID attemptId; + + private final long[] stateTimestamps; + + private final int attemptNumber; + + private final ExecutionState state; + + private final String failureCause; // once assigned, never changes + + private final TaskManagerLocation assignedResourceLocation; // for the archived execution + + /* Continuously updated map of user-defined accumulators */ + private final StringifiedAccumulatorResult[] userAccumulators; + + /* Continuously updated map of internal accumulators */ + private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators; + private final int parallelSubtaskIndex; + + public ArchivedExecution(Execution execution) { + this.userAccumulators = execution.getUserAccumulatorsStringified(); + this.flinkAccumulators = execution.getFlinkAccumulators(); + this.attemptId = execution.getAttemptId(); + this.attemptNumber = execution.getAttemptNumber(); + this.stateTimestamps = execution.getStateTimestamps(); + this.parallelSubtaskIndex = execution.getVertex().getParallelSubtaskIndex(); + this.state = execution.getState(); + this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause()); + this.assignedResourceLocation = execution.getAssignedResourceLocation(); + } + + // -------------------------------------------------------------------------------------------- + // Accessors + // -------------------------------------------------------------------------------------------- + + @Override + public ExecutionAttemptID getAttemptId() { + return attemptId; + } + + @Override + public int getAttemptNumber() { + return attemptNumber; + } + + @Override + public long[] getStateTimestamps() { + return stateTimestamps; + } + + @Override + public ExecutionState getState() { + return state; + } + + @Override + public TaskManagerLocation getAssignedResourceLocation() { + return assignedResourceLocation; + } + + @Override + public String getFailureCauseAsString() { + return failureCause; + } + + @Override + public long getStateTimestamp(ExecutionState state) { + return this.stateTimestamps[state.ordinal()]; + } + + @Override + public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { + return userAccumulators; + } + + @Override + public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() { + return flinkAccumulators; + } + + @Override + public int getParallelSubtaskIndex() { + return parallelSubtaskIndex; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java new file mode 100644 index 0000000..493825a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.SerializedValue; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable { + private static final long serialVersionUID = 7231383912742578428L; + // -------------------------------------------------------------------------------------------- + + /** The ID of the job this graph has been built for. */ + private final JobID jobID; + + /** The name of the original job graph. */ + private final String jobName; + + /** All job vertices that are part of this graph */ + private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks; + + /** All vertices, in the order in which they were created **/ + private final List<ArchivedExecutionJobVertex> verticesInCreationOrder; + + /** + * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when + * the execution graph transitioned into a certain state. The index into this array is the + * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is + * at {@code stateTimestamps[RUNNING.ordinal()]}. + */ + private final long[] stateTimestamps; + + // ------ Configuration of the Execution ------- + + // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- + + /** Current status of the job execution */ + private final JobStatus state; + + /** + * The exception that caused the job to fail. This is set to the first root exception + * that was not recoverable and triggered job failure + */ + private final String failureCause; + + // ------ Fields that are only relevant for archived execution graphs ------------ + private final String jsonPlan; + private final StringifiedAccumulatorResult[] archivedUserAccumulators; + private final ArchivedExecutionConfig archivedExecutionConfig; + private final boolean isStoppable; + private final Map<String, SerializedValue<Object>> serializedUserAccumulators; + private final ArchivedCheckpointStatsTracker tracker; + + public ArchivedExecutionGraph( + JobID jobID, + String jobName, + Map<JobVertexID, ArchivedExecutionJobVertex> tasks, + List<ArchivedExecutionJobVertex> verticesInCreationOrder, + long[] stateTimestamps, + JobStatus state, + String failureCause, + String jsonPlan, + StringifiedAccumulatorResult[] archivedUserAccumulators, + Map<String, SerializedValue<Object>> serializedUserAccumulators, + ArchivedExecutionConfig executionConfig, + boolean isStoppable, + ArchivedCheckpointStatsTracker tracker + ) { + this.jobID = jobID; + this.jobName = jobName; + this.tasks = tasks; + this.verticesInCreationOrder = verticesInCreationOrder; + this.stateTimestamps = stateTimestamps; + this.state = state; + this.failureCause = failureCause; + this.jsonPlan = jsonPlan; + this.archivedUserAccumulators = archivedUserAccumulators; + this.serializedUserAccumulators = serializedUserAccumulators; + this.archivedExecutionConfig = executionConfig; + this.isStoppable = isStoppable; + this.tracker = tracker; + } + + // -------------------------------------------------------------------------------------------- + @Override + public String getJsonPlan() { + return jsonPlan; + } + + @Override + public JobID getJobID() { + return jobID; + } + + @Override + public String getJobName() { + return jobName; + } + + @Override + public JobStatus getState() { + return state; + } + + @Override + public String getFailureCauseAsString() { + return failureCause; + } + + @Override + public ArchivedExecutionJobVertex getJobVertex(JobVertexID id) { + return this.tasks.get(id); + } + + @Override + public Map<JobVertexID, AccessExecutionJobVertex> getAllVertices() { + return Collections.<JobVertexID, AccessExecutionJobVertex>unmodifiableMap(this.tasks); + } + + @Override + public Iterable<ArchivedExecutionJobVertex> getVerticesTopologically() { + // we return a specific iterator that does not fail with concurrent modifications + // the list is append only, so it is safe for that + final int numElements = this.verticesInCreationOrder.size(); + + return new Iterable<ArchivedExecutionJobVertex>() { + @Override + public Iterator<ArchivedExecutionJobVertex> iterator() { + return new Iterator<ArchivedExecutionJobVertex>() { + private int pos = 0; + + @Override + public boolean hasNext() { + return pos < numElements; + } + + @Override + public ArchivedExecutionJobVertex next() { + if (hasNext()) { + return verticesInCreationOrder.get(pos++); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + @Override + public Iterable<ArchivedExecutionVertex> getAllExecutionVertices() { + return new Iterable<ArchivedExecutionVertex>() { + @Override + public Iterator<ArchivedExecutionVertex> iterator() { + return new AllVerticesIterator(getVerticesTopologically().iterator()); + } + }; + } + + @Override + public long getStatusTimestamp(JobStatus status) { + return this.stateTimestamps[status.ordinal()]; + } + + @Override + public CheckpointStatsTracker getCheckpointStatsTracker() { + return tracker; + } + + /** + * Gets the internal flink accumulator map of maps which contains some metrics. + * + * @return A map of accumulators for every executed task. + */ + @Override + public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() { + Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = + new HashMap<>(); + + for (AccessExecutionVertex vertex : getAllExecutionVertices()) { + Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators(); + flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs); + } + + return flinkAccumulators; + } + + @Override + public boolean isArchived() { + return true; + } + + public StringifiedAccumulatorResult[] getUserAccumulators() { + return archivedUserAccumulators; + } + + public ArchivedExecutionConfig getArchivedExecutionConfig() { + return archivedExecutionConfig; + } + + @Override + public boolean isStoppable() { + return isStoppable; + } + + @Override + public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() { + return archivedUserAccumulators; + } + + @Override + public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() { + return serializedUserAccumulators; + } + + class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> { + + private final Iterator<ArchivedExecutionJobVertex> jobVertices; + + private ArchivedExecutionVertex[] currVertices; + + private int currPos; + + + public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) { + this.jobVertices = jobVertices; + } + + + @Override + public boolean hasNext() { + while (true) { + if (currVertices != null) { + if (currPos < currVertices.length) { + return true; + } else { + currVertices = null; + } + } else if (jobVertices.hasNext()) { + currVertices = jobVertices.next().getTaskVertices(); + currPos = 0; + } else { + return false; + } + } + } + + @Override + public ArchivedExecutionVertex next() { + if (hasNext()) { + return currVertices[currPos++]; + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java new file mode 100644 index 0000000..4857bf5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import scala.Option; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState; + +public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable { + + private static final long serialVersionUID = -5768187638639437957L; + private final ArchivedExecutionVertex[] taskVertices; + + private final JobVertexID id; + + private final String name; + + private final int parallelism; + + private final int maxParallelism; + + private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> aggregatedMetricAccumulators; + private final Option<OperatorCheckpointStats> checkpointStats; + private final StringifiedAccumulatorResult[] archivedUserAccumulators; + + public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) { + this.taskVertices = new ArchivedExecutionVertex[jobVertex.getTaskVertices().length]; + for (int x = 0; x < taskVertices.length; x++) { + taskVertices[x] = jobVertex.getTaskVertices()[x].archive(); + } + + aggregatedMetricAccumulators = jobVertex.getAggregatedMetricAccumulators(); + + Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>(); + for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { + Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); + if (next != null) { + AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next); + } + } + archivedUserAccumulators = jobVertex.getAggregatedUserAccumulatorsStringified(); + + this.id = jobVertex.getJobVertexId(); + this.name = jobVertex.getJobVertex().getName(); + this.parallelism = jobVertex.getParallelism(); + this.maxParallelism = jobVertex.getMaxParallelism(); + CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker(); + checkpointStats = tracker != null + ? tracker.getOperatorStats(this.id) + : Option.<OperatorCheckpointStats>empty(); + } + + // -------------------------------------------------------------------------------------------- + // Accessors + // -------------------------------------------------------------------------------------------- + + @Override + public String getName() { + return name; + } + + @Override + public int getParallelism() { + return parallelism; + } + + @Override + public int getMaxParallelism() { + return maxParallelism; + } + + @Override + public JobVertexID getJobVertexId() { + return id; + } + + @Override + public ArchivedExecutionVertex[] getTaskVertices() { + return taskVertices; + } + + @Override + public ExecutionState getAggregateState() { + int[] num = new int[ExecutionState.values().length]; + for (ArchivedExecutionVertex vertex : this.taskVertices) { + num[vertex.getExecutionState().ordinal()]++; + } + + return getAggregateJobVertexState(num, parallelism); + } + + public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() { + return this.aggregatedMetricAccumulators; + } + + // -------------------------------------------------------------------------------------------- + // Static / pre-assigned input splits + // -------------------------------------------------------------------------------------------- + + @Override + public Option<OperatorCheckpointStats> getCheckpointStats() { + return checkpointStats; + } + + @Override + public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() { + return archivedUserAccumulators; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java new file mode 100644 index 0000000..e1fb11a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable { + + private static final long serialVersionUID = -6708241535015028576L; + private final int subTaskIndex; + + private final List<ArchivedExecution> priorExecutions; + + /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */ + private final String taskNameWithSubtask; + + private final ArchivedExecution currentExecution; // this field must never be null + + public ArchivedExecutionVertex(ExecutionVertex vertex) { + this.subTaskIndex = vertex.getParallelSubtaskIndex(); + this.priorExecutions = new ArrayList<>(); + for (Execution priorExecution : vertex.getPriorExecutions()) { + priorExecutions.add(priorExecution.archive()); + } + this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex(); + this.currentExecution = vertex.getCurrentExecutionAttempt().archive(); + } + + // -------------------------------------------------------------------------------------------- + // Accessors + // -------------------------------------------------------------------------------------------- + + @Override + public String getTaskNameWithSubtaskIndex() { + return this.taskNameWithSubtask; + } + + @Override + public int getParallelSubtaskIndex() { + return this.subTaskIndex; + } + + @Override + public ArchivedExecution getCurrentExecutionAttempt() { + return currentExecution; + } + + @Override + public ExecutionState getExecutionState() { + return currentExecution.getState(); + } + + @Override + public long getStateTimestamp(ExecutionState state) { + return currentExecution.getStateTimestamp(state); + } + + @Override + public String getFailureCauseAsString() { + return currentExecution.getFailureCauseAsString(); + } + + @Override + public TaskManagerLocation getCurrentAssignedResourceLocation() { + return currentExecution.getAssignedResourceLocation(); + } + + @Override + public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) { + if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) { + return priorExecutions.get(attemptNumber); + } else { + throw new IllegalArgumentException("attempt does not exist"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index b92e3af..0b56931 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph; import akka.dispatch.OnComplete; import akka.dispatch.OnFailure; +import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -102,7 +103,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting * actions if it is not. Many actions are also idempotent (like canceling). */ -public class Execution { +public class Execution implements AccessExecution, Archiveable<ArchivedExecution> { private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); @@ -188,14 +189,17 @@ public class Execution { return vertex; } + @Override public ExecutionAttemptID getAttemptId() { return attemptId; } + @Override public int getAttemptNumber() { return attemptNumber; } + @Override public ExecutionState getState() { return state; } @@ -204,6 +208,7 @@ public class Execution { return assignedResource; } + @Override public TaskManagerLocation getAssignedResourceLocation() { return assignedResourceLocation; } @@ -212,10 +217,17 @@ public class Execution { return failureCause; } + @Override + public String getFailureCauseAsString() { + return ExceptionUtils.stringifyException(getFailureCause()); + } + + @Override public long[] getStateTimestamps() { return stateTimestamps; } + @Override public long getStateTimestamp(ExecutionState state) { return this.stateTimestamps[state.ordinal()]; } @@ -237,21 +249,6 @@ public class Execution { } /** - * This method cleans fields that are irrelevant for the archived execution attempt. - */ - public void prepareForArchiving() { - if (assignedResource != null && assignedResource.isAlive()) { - throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running."); - } - assignedResource = null; - - executionContext = null; - - partialInputChannelDeploymentDescriptors.clear(); - partialInputChannelDeploymentDescriptors = null; - } - - /** * Sets the initial state for the execution. The serialized state is then shipped via the * {@link TaskDeploymentDescriptor} to the TaskManagers. * @@ -1055,14 +1052,21 @@ public class Execution { return userAccumulators; } + @Override public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators); } + @Override public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() { return flinkAccumulators; } + @Override + public int getParallelSubtaskIndex() { + return getVertex().getParallelSubtaskIndex(); + } + // ------------------------------------------------------------------------ // Standard utilities // ------------------------------------------------------------------------ @@ -1072,4 +1076,9 @@ public class Execution { return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(), (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } + + @Override + public ArchivedExecution archive() { + return new ArchivedExecution(this); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 10f0e88..aa9406c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; @@ -32,14 +33,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats; +import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; -import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary; +import org.apache.flink.api.common.Archiveable; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -58,8 +61,10 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; +import scala.Option; import java.io.IOException; import java.net.URL; @@ -102,7 +107,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * address the message receiver.</li> * </ul> */ -public class ExecutionGraph { +public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> { private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state"); @@ -180,9 +185,6 @@ public class ExecutionGraph { * from results than need to be materialized. */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; - /** Flag to indicate whether the Graph has been archived */ - private boolean isArchived = false; - // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- /** Current status of the job execution */ @@ -222,9 +224,6 @@ public class ExecutionGraph { // ------ Fields that are only relevant for archived execution graphs ------------ private String jsonPlan; - /** Serializable summary of all job config values, e.g. for web interface */ - private ExecutionConfigSummary executionConfigSummary; - // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -304,16 +303,6 @@ public class ExecutionGraph { metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge()); this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices()); - - // create a summary of all relevant data accessed in the web interface's JobConfigHandler - try { - ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader); - if (executionConfig != null) { - this.executionConfigSummary = new ExecutionConfigSummary(executionConfig); - } - } catch (IOException | ClassNotFoundException e) { - LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e); - } } // -------------------------------------------------------------------------------------------- @@ -344,8 +333,9 @@ public class ExecutionGraph { return scheduleMode; } + @Override public boolean isArchived() { - return isArchived; + return false; } public void enableSnapshotCheckpointing( @@ -434,6 +424,7 @@ public class ExecutionGraph { return restartStrategy; } + @Override public CheckpointStatsTracker getCheckpointStatsTracker() { return checkpointStatsTracker; } @@ -484,6 +475,7 @@ public class ExecutionGraph { this.jsonPlan = jsonPlan; } + @Override public String getJsonPlan() { return jsonPlan; } @@ -492,14 +484,17 @@ public class ExecutionGraph { return slotProvider; } + @Override public JobID getJobID() { return jobID; } + @Override public String getJobName() { return jobName; } + @Override public boolean isStoppable() { return this.isStoppable; } @@ -512,6 +507,7 @@ public class ExecutionGraph { return this.userClassLoader; } + @Override public JobStatus getState() { return state; } @@ -520,14 +516,22 @@ public class ExecutionGraph { return failureCause; } + @Override + public String getFailureCauseAsString() { + return ExceptionUtils.stringifyException(failureCause); + } + + @Override public ExecutionJobVertex getJobVertex(JobVertexID id) { return this.tasks.get(id); } + @Override public Map<JobVertexID, ExecutionJobVertex> getAllVertices() { return Collections.unmodifiableMap(this.tasks); } + @Override public Iterable<ExecutionJobVertex> getVerticesTopologically() { // we return a specific iterator that does not fail with concurrent modifications // the list is append only, so it is safe for that @@ -566,6 +570,7 @@ public class ExecutionGraph { return Collections.unmodifiableMap(this.intermediateResults); } + @Override public Iterable<ExecutionVertex> getAllExecutionVertices() { return new Iterable<ExecutionVertex>() { @Override @@ -575,6 +580,7 @@ public class ExecutionGraph { }; } + @Override public long getStatusTimestamp(JobStatus status) { return this.stateTimestamps[status.ordinal()]; } @@ -592,6 +598,7 @@ public class ExecutionGraph { * Gets the internal flink accumulator map of maps which contains some metrics. * @return A map of accumulators for every executed task. */ + @Override public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() { Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>(); @@ -627,6 +634,7 @@ public class ExecutionGraph { * @return The accumulator map with serialized accumulator values. * @throws IOException */ + @Override public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException { Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); @@ -643,6 +651,7 @@ public class ExecutionGraph { * Returns the a stringified version of the user-defined accumulators. * @return an Array containing the StringifiedAccumulatorResult objects */ + @Override public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() { Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap); @@ -926,51 +935,21 @@ public class ExecutionGraph { } /** - * This method cleans fields that are irrelevant for the archived execution attempt. + * Returns the serializable ArchivedExecutionConfig + * @return ArchivedExecutionConfig which may be null in case of errors */ - public void prepareForArchiving() { - if (!state.isGloballyTerminalState()) { - throw new IllegalStateException("Can only archive the job from a terminal state"); - } - - // clear the non-serializable fields - restartStrategy = null; - slotProvider = null; - checkpointCoordinator = null; - executionContext = null; - kvStateLocationRegistry = null; - - for (ExecutionJobVertex vertex : verticesInCreationOrder) { - vertex.prepareForArchiving(); - } - - intermediateResults.clear(); - currentExecutions.clear(); - requiredJarFiles.clear(); - requiredClasspaths.clear(); - jobStatusListeners.clear(); - executionListeners.clear(); - - if (userClassLoader instanceof FlinkUserCodeClassLoader) { - try { - // close the classloader to free space of user jars immediately - // otherwise we have to wait until garbage collection - ((FlinkUserCodeClassLoader) userClassLoader).close(); - } catch (IOException e) { - LOG.warn("Failed to close the user classloader for job {}", jobID, e); + @Override + public ArchivedExecutionConfig getArchivedExecutionConfig() { + // create a summary of all relevant data accessed in the web interface's JobConfigHandler + try { + ExecutionConfig executionConfig = getSerializedExecutionConfig().deserializeValue(userClassLoader); + if (executionConfig != null) { + return executionConfig.archive(); } - } - userClassLoader = null; - - isArchived = true; - } - - /** - * Returns the serializable ExecutionConfigSummary - * @return ExecutionConfigSummary which may be null in case of errors - */ - public ExecutionConfigSummary getExecutionConfigSummary() { - return executionConfigSummary; + } catch (IOException | ClassNotFoundException e) { + LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", jobID, e); + }; + return null; } /** @@ -1282,4 +1261,53 @@ public class ExecutionGraph { } } } + + @Override + public ArchivedExecutionGraph archive() { + Map<JobVertexID, OperatorCheckpointStats> operatorStats = new HashMap<>(); + Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>(); + List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>(); + for (ExecutionJobVertex task : verticesInCreationOrder) { + ArchivedExecutionJobVertex archivedTask = task.archive(); + archivedVerticesInCreationOrder.add(archivedTask); + archivedTasks.put(task.getJobVertexId(), archivedTask); + Option<OperatorCheckpointStats> statsOption = task.getCheckpointStats(); + if (statsOption.isDefined()) { + operatorStats.put(task.getJobVertexId(), statsOption.get()); + } + } + + Option<JobCheckpointStats> jobStats; + if (getCheckpointStatsTracker() == null) { + jobStats = Option.empty(); + } else { + jobStats = getCheckpointStatsTracker().getJobStats(); + } + + ArchivedCheckpointStatsTracker statsTracker = new ArchivedCheckpointStatsTracker(jobStats, operatorStats); + + Map<String, SerializedValue<Object>> serializedUserAccumulators; + try { + serializedUserAccumulators = getAccumulatorsSerialized(); + } catch (Exception e) { + LOG.warn("Error occurred while archiving user accumulators.", e); + serializedUserAccumulators = Collections.emptyMap(); + } + + return new ArchivedExecutionGraph( + getJobID(), + getJobName(), + archivedTasks, + archivedVerticesInCreationOrder, + stateTimestamps, + getState(), + getFailureCauseAsString(), + getJsonPlan(), + getAccumulatorResultsStringified(), + serializedUserAccumulators, + getArchivedExecutionConfig(), + isStoppable(), + statsTracker + ); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index ead0852..e7f16a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -28,7 +28,10 @@ import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.api.common.Archiveable; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; @@ -43,6 +46,7 @@ import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; +import scala.Option; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; @@ -51,7 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ExecutionJobVertex { +public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> { /** Use the same log for all ExecutionGraph classes */ private static final Logger LOG = ExecutionGraph.LOG; @@ -197,10 +201,17 @@ public class ExecutionJobVertex { return jobVertex; } + @Override + public String getName() { + return getJobVertex().getName(); + } + + @Override public int getParallelism() { return parallelism; } + @Override public int getMaxParallelism() { return maxParallelism; } @@ -209,10 +220,12 @@ public class ExecutionJobVertex { return graph.getJobID(); } + @Override public JobVertexID getJobVertexId() { return jobVertex.getID(); } + @Override public ExecutionVertex[] getTaskVertices() { return taskVertices; } @@ -241,6 +254,7 @@ public class ExecutionJobVertex { return numSubtasksInFinalState == parallelism; } + @Override public ExecutionState getAggregateState() { int[] num = new int[ExecutionState.values().length]; for (ExecutionVertex vertex : this.taskVertices) { @@ -250,6 +264,16 @@ public class ExecutionJobVertex { return getAggregateJobVertexState(num, parallelism); } + @Override + public Option<OperatorCheckpointStats> getCheckpointStats() { + CheckpointStatsTracker tracker = getGraph().getCheckpointStatsTracker(); + if (tracker == null) { + return Option.empty(); + } else { + return tracker.getOperatorStats(getJobVertexId()); + } + } + //--------------------------------------------------------------------------------------------- public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { @@ -371,36 +395,6 @@ public class ExecutionJobVertex { } } - /** - * This method cleans fields that are irrelevant for the archived execution attempt. - */ - public void prepareForArchiving() { - - for (ExecutionVertex vertex : taskVertices) { - vertex.prepareForArchiving(); - } - - // clear intermediate results - inputs.clear(); - producedDataSets = null; - - // reset shared groups - if (slotSharingGroup != null) { - slotSharingGroup.clearTaskAssignment(); - } - if (coLocationGroup != null) { - coLocationGroup.resetConstraints(); - } - - // reset splits and split assigner - splitAssigner = null; - if (inputSplits != null) { - for (int i = 0; i < inputSplits.length; i++) { - inputSplits[i] = null; - } - } - } - //--------------------------------------------------------------------------------------------- // Notifications //--------------------------------------------------------------------------------------------- @@ -627,4 +621,9 @@ public class ExecutionJobVertex { return ExecutionState.CREATED; } } + + @Override + public ArchivedExecutionJobVertex archive() { + return new ArchivedExecutionJobVertex(this); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 4837803..96af91e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.api.common.Archiveable; import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; @@ -72,7 +74,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of * which time it spawns an {@link Execution}. */ -public class ExecutionVertex { +public class ExecutionVertex implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> { private static final Logger LOG = ExecutionGraph.LOG; @@ -176,6 +178,7 @@ public class ExecutionVertex { return this.jobVertex.getJobVertex().getName(); } + @Override public String getTaskNameWithSubtaskIndex() { return this.taskNameWithSubtask; } @@ -188,6 +191,7 @@ public class ExecutionVertex { return this.jobVertex.getMaxParallelism(); } + @Override public int getParallelSubtaskIndex() { return this.subTaskIndex; } @@ -207,18 +211,26 @@ public class ExecutionVertex { return locationConstraint; } + @Override public Execution getCurrentExecutionAttempt() { return currentExecution; } + @Override public ExecutionState getExecutionState() { return currentExecution.getState(); } + @Override public long getStateTimestamp(ExecutionState state) { return currentExecution.getStateTimestamp(state); } + @Override + public String getFailureCauseAsString() { + return ExceptionUtils.stringifyException(getFailureCause()); + } + public Throwable getFailureCause() { return currentExecution.getFailureCause(); } @@ -227,10 +239,12 @@ public class ExecutionVertex { return currentExecution.getAssignedResource(); } + @Override public TaskManagerLocation getCurrentAssignedResourceLocation() { return currentExecution.getAssignedResourceLocation(); } + @Override public Execution getPriorExecutionAttempt(int attemptNumber) { if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) { return priorExecutions.get(attemptNumber); @@ -240,6 +254,10 @@ public class ExecutionVertex { } } + List<Execution> getPriorExecutions() { + return priorExecutions; + } + public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); } @@ -537,31 +555,6 @@ public class ExecutionVertex { } } - /** - * This method cleans fields that are irrelevant for the archived execution attempt. - */ - public void prepareForArchiving() throws IllegalStateException { - Execution execution = currentExecution; - - // sanity check - if (!execution.isFinished()) { - throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state."); - } - - // prepare the current execution for archiving - execution.prepareForArchiving(); - - // prepare previous executions for archiving - for (Execution exec : priorExecutions) { - exec.prepareForArchiving(); - } - - // clear the unnecessary fields in this class - this.resultPartitions = null; - this.inputEdges = null; - this.locationConstraint = null; - } - public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo){ getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo); } @@ -708,4 +701,9 @@ public class ExecutionVertex { public String toString() { return getSimpleName(); } + + @Override + public ArchivedExecutionVertex archive() { + return new ArchivedExecutionVertex(this); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java deleted file mode 100644 index ad4677f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.executiongraph.archive; - -import org.apache.flink.api.common.ExecutionConfig; - -import java.io.Serializable; -import java.util.Collections; -import java.util.Map; - -/** - * Serializable class which is created when archiving the job. - * It can be used to display job information on the web interface - * without having to keep the classloader around after job completion. - */ -public class ExecutionConfigSummary implements Serializable { - - private final String executionMode; - private final String restartStrategyDescription; - private final int parallelism; - private final boolean objectReuseEnabled; - private final Map<String, String> globalJobParameters; - - public ExecutionConfigSummary(ExecutionConfig ec) { - executionMode = ec.getExecutionMode().name(); - if (ec.getRestartStrategy() != null) { - restartStrategyDescription = ec.getRestartStrategy().getDescription(); - } else { - restartStrategyDescription = "default"; - } - parallelism = ec.getParallelism(); - objectReuseEnabled = ec.isObjectReuseEnabled(); - if (ec.getGlobalJobParameters() != null - && ec.getGlobalJobParameters().toMap() != null) { - globalJobParameters = ec.getGlobalJobParameters().toMap(); - } else { - globalJobParameters = Collections.emptyMap(); - } - } - - public String getExecutionMode() { - return executionMode; - } - - public String getRestartStrategyDescription() { - return restartStrategyDescription; - } - - public int getParallelism() { - return parallelism; - } - - public boolean getObjectReuseEnabled() { - return objectReuseEnabled; - } - - public Map<String, String> getGlobalJobParameters() { - return globalJobParameters; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 37a91b3..87df0d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; @@ -164,7 +164,7 @@ public final class WebMonitorUtils { } } - public static JobDetails createDetailsForJob(ExecutionGraph job) { + public static JobDetails createDetailsForJob(AccessExecutionGraph job) { JobStatus status = job.getState(); long started = job.getStatusTimestamp(JobStatus.CREATED); @@ -174,11 +174,11 @@ public final class WebMonitorUtils { long lastChanged = 0; int numTotalTasks = 0; - for (ExecutionJobVertex ejv : job.getVerticesTopologically()) { - ExecutionVertex[] vertices = ejv.getTaskVertices(); + for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) { + AccessExecutionVertex[] vertices = ejv.getTaskVertices(); numTotalTasks += vertices.length; - for (ExecutionVertex vertex : vertices) { + for (AccessExecutionVertex vertex : vertices) { ExecutionState state = vertex.getExecutionState(); countsPerStatus[state.ordinal()]++; lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state)); http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cca0124..8f3b82a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1685,12 +1685,9 @@ class JobManager( }(context.dispatcher)) try { - eg.prepareForArchiving() - - archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg)) + archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive())) } catch { - case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " + - "archiving.", t) + case t: Throwable => log.error(s"Could not archive the execution graph $eg.", t) } futureOption http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 2d55b26..7f8fcd3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -28,7 +28,7 @@ import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.webmonitor.WebMonitorUtils import org.apache.flink.runtime.{FlinkActor, LogMessages} import org.apache.flink.runtime.messages.webmonitor._ -import org.apache.flink.runtime.executiongraph.ExecutionGraph +import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph} import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -66,7 +66,7 @@ class MemoryArchivist(private val max_entries: Int) * Map of execution graphs belonging to recently started jobs with the time stamp of the last * received job event. The insert order is preserved through a LinkedHashMap. */ - protected val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]() + protected val graphs = mutable.LinkedHashMap[JobID, ArchivedExecutionGraph]() /* Counters for finished, canceled, and failed jobs */ private[this] var finishedCnt: Int = 0 http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala index c4e3f3e..435b736 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala @@ -19,14 +19,14 @@ package org.apache.flink.runtime.messages import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.executiongraph.ExecutionGraph +import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph} /** * This object contains the archive specific messages. */ object ArchiveMessages { - case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph) + case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph) /** * Request the currently archived jobs in the archiver. The resulting response is [[ArchivedJobs]] @@ -44,19 +44,19 @@ object ArchiveMessages { */ case class RequestArchivedJob(jobID: JobID) - case class ArchivedJob(job: Option[ExecutionGraph]) + case class ArchivedJob(job: Option[ArchivedExecutionGraph]) /** * Response to [[RequestArchivedJobs]] message. The response contains the archived jobs. * @param jobs */ - case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){ - def asJavaIterable: java.lang.Iterable[ExecutionGraph] = { + case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){ + def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = { import scala.collection.JavaConverters._ jobs.asJava } - def asJavaCollection: java.util.Collection[ExecutionGraph] = { + def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = { import scala.collection.JavaConverters._ jobs.asJavaCollection } http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 4cf6a02..3df8c26 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID import org.apache.flink.runtime.akka.ListeningBehaviour import org.apache.flink.runtime.blob.BlobKey import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult} -import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} +import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph} import org.apache.flink.runtime.instance.{Instance, InstanceID} import org.apache.flink.runtime.io.network.partition.ResultPartitionID import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID} @@ -371,7 +371,7 @@ object JobManagerMessages { * @param jobID * @param executionGraph */ - case class JobFound(jobID: JobID, executionGraph: ExecutionGraph) extends JobResponse + case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph) extends JobResponse /** * Denotes that there is no job with [[jobID]] retrievable. This message can be the response of http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index ea4d322..0b2f4f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -81,7 +81,7 @@ public class CoordinatorShutdownTest { new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout); - ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); + ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); assertNotNull(graph); graph.waitUntilFinished(); @@ -133,7 +133,7 @@ public class CoordinatorShutdownTest { new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout); - ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); + ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); assertNotNull(graph); graph.waitUntilFinished();