[hotfix] Fix checkstyle violations in ExecutionVertex
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75260bfc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75260bfc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75260bfc Branch: refs/heads/release-1.5 Commit: 75260bfcb2801f5020546974d8ded812dcae62c6 Parents: 99d524a Author: Till Rohrmann <trohrm...@apache.org> Authored: Sun Jul 22 20:43:44 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jul 23 17:22:52 2018 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 80 ++++++++++---------- 1 file changed, 39 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/75260bfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 8b57a7a..e385318 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -33,9 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -46,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; @@ -96,12 +96,12 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi private final Time timeout; - /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */ + /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */ private final String taskNameWithSubtask; private volatile CoLocationConstraint locationConstraint; - /** The current or latest execution attempt of this vertex's task */ + /** The current or latest execution attempt of this vertex's task. */ private volatile Execution currentExecution; // this field must never be null // -------------------------------------------------------------------------------------------- @@ -117,17 +117,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi Time timeout) { this( - jobVertex, - subTaskIndex, - producedDataSets, - timeout, - 1L, - System.currentTimeMillis(), - JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue()); + jobVertex, + subTaskIndex, + producedDataSets, + timeout, + 1L, + System.currentTimeMillis(), + JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue()); } /** - * + * Creates an ExecutionVertex. + * * @param timeout * The RPC timeout to use for deploy / cancel calls * @param initialGlobalModVersion @@ -311,7 +312,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi /** * Gets the location where the latest completed/canceled/failed execution of the vertex's * task happened. - * + * * @return The latest prior execution location, or null, if there is none, yet. */ public TaskManagerLocation getLatestPriorLocation() { @@ -444,36 +445,36 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi /** * Gets the overall preferred execution location for this vertex's current execution. * The preference is determined as follows: - * + * * <ol> * <li>If the task execution has state to load (from a checkpoint), then the location preference * is the location of the previous execution (if there is a previous execution attempt). * <li>If the task execution has no state or no previous location, then the location preference * is based on the task's inputs. * </ol> - * - * These rules should result in the following behavior: - * + * + * <p>These rules should result in the following behavior: + * * <ul> * <li>Stateless tasks are always scheduled based on co-location with inputs. * <li>Stateful tasks are on their initial attempt executed based on co-location with inputs. * <li>Repeated executions of stateful tasks try to co-locate the execution with its state. * </ul> - * - * @return The preferred execution locations for the execution attempt. - * + * * @see #getPreferredLocationsBasedOnState() - * @see #getPreferredLocationsBasedOnInputs() + * @see #getPreferredLocationsBasedOnInputs() + * + * @return The preferred execution locations for the execution attempt. */ public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() { Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState(); return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs(); } - + /** * Gets the preferred location to execute the current task execution attempt, based on the state * that the execution attempt will resume. - * + * * @return A size-one collection with the location preference, or null, if there is no * location preference based on the state. */ @@ -542,27 +543,25 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi /** * Archives the current Execution and creates a new Execution for this vertex. - * + * * <p>This method atomically checks if the ExecutionGraph is still of an expected * global mod. version and replaces the execution if that is the case. If the ExecutionGraph * has increased its global mod. version in the meantime, this operation fails. - * + * * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and * reconfiguration actions in a similar way as "optimistic concurrency control". - * + * * @param timestamp * The creation timestamp for the new Execution * @param originatingGlobalModVersion - * The - * - * @return Returns the new created Execution. - * + * + * @return Returns the new created Execution. + * * @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod * version than the one passed to this message. */ public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion) - throws GlobalModVersionMismatch - { + throws GlobalModVersionMismatch { LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex()); synchronized (priorExecutions) { @@ -642,12 +641,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } /** - * + * Cancels this ExecutionVertex. + * * @return A future that completes once the execution has reached its final state. */ public CompletableFuture<?> cancel() { // to avoid any case of mixup in the presence of concurrent calls, - // we copy a reference to the stack to make sure both calls go to the same Execution + // we copy a reference to the stack to make sure both calls go to the same Execution final Execution exec = this.currentExecution; exec.cancel(); return exec.getReleaseFuture(); @@ -742,7 +742,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi // -------------------------------------------------------------------------------------------- /** - * Simply forward this notification + * Simply forward this notification. */ void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) { // only forward this notification if the execution is still the current execution @@ -754,7 +754,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi /** * Creates a task deployment descriptor to deploy a subtask to the given target slot. - * * TODO: This should actually be in the EXECUTION */ TaskDeploymentDescriptor createDeploymentDescriptor( @@ -762,13 +761,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi LogicalSlot targetSlot, @Nullable JobManagerTaskRestore taskRestore, int attemptNumber) throws ExecutionGraphException { - + // Produced intermediate results List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size()); - + // Consumed intermediate results List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length); - + boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment(); for (IntermediateResultPartition partition : resultPartitions.values()) { @@ -791,8 +790,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling)); } } - - + for (ExecutionEdge[] edges : inputEdges) { InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges( edges,