[FLINK-1672] [runtime] Unify Task and RuntimeEnvironment into one class. - This simplifies and hardens the failure handling during task startup - Guarantees that no actor system threads are blocked by task bootstrap, or task canceling - Corrects some previously erroneous corner case state transitions - Adds simple and robust tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e613014 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e613014 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e613014 Branch: refs/heads/master Commit: 8e61301452218e6d279b013beb7bbd02a7c2e3f9 Parents: 1d368a4 Author: Stephan Ewen <se...@apache.org> Authored: Sun May 3 04:41:03 2015 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon May 11 21:13:41 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/execution/Environment.java | 3 + .../flink/runtime/execution/ExecutionState.java | 6 +- .../runtime/execution/RuntimeEnvironment.java | 458 --------- .../runtime/io/network/NetworkEnvironment.java | 11 +- .../io/network/partition/ResultPartition.java | 14 +- .../partition/consumer/SingleInputGate.java | 15 +- .../runtime/operators/RegularPactTask.java | 3 +- .../runtime/taskmanager/RuntimeEnvironment.java | 230 +++++ .../apache/flink/runtime/taskmanager/Task.java | 990 ++++++++++++++----- .../runtime/messages/TaskManagerMessages.scala | 12 +- .../flink/runtime/messages/TaskMessages.scala | 6 +- .../flink/runtime/taskmanager/TaskManager.scala | 321 ++---- .../consumer/LocalInputChannelTest.java | 7 +- .../partition/consumer/SingleInputGateTest.java | 6 +- .../partition/consumer/TestSingleInputGate.java | 4 +- .../partition/consumer/UnionInputGateTest.java | 9 +- .../operators/testutils/MockEnvironment.java | 6 + .../runtime/taskmanager/ForwardingActor.java | 41 + .../taskmanager/TaskExecutionStateTest.java | 33 + .../runtime/taskmanager/TaskManagerTest.java | 152 ++- .../flink/runtime/taskmanager/TaskTest.java | 874 ++++++++++++---- .../testingUtils/TestingTaskManager.scala | 6 +- 22 files changed, 1972 insertions(+), 1235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 7ab3bc9..081e3ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.execution; +import akka.actor.ActorRef; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -159,4 +160,6 @@ public interface Environment { InputGate[] getAllInputGates(); + // this should go away + ActorRef getJobManager(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java index 2fcaea1..9f4a5a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java @@ -35,10 +35,10 @@ package org.apache.flink.runtime.execution; * ... -> FAILED * </pre> * - * It is possible to enter the {@code FAILED} state from any other state. + * <p>It is possible to enter the {@code FAILED} state from any other state.</p> * - * The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are - * considered terminal states. + * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are + * considered terminal states.</p> */ public enum ExecutionState { http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java deleted file mode 100644 index 081d4bc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.execution; - -import akka.actor.ActorRef; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.accumulators.AccumulatorEvent; -import org.apache.flink.runtime.broadcast.BroadcastVariableManager; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult; -import org.apache.flink.runtime.taskmanager.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.FutureTask; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.base.Preconditions.checkElementIndex; -import static com.google.common.base.Preconditions.checkNotNull; - -public class RuntimeEnvironment implements Environment, Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class); - - private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads"); - - /** The ActorRef to the job manager */ - private final ActorRef jobManager; - - /** The task that owns this environment */ - private final Task owner; - - /** The job configuration encapsulated in the environment object. */ - private final Configuration jobConfiguration; - - /** The task configuration encapsulated in the environment object. */ - private final Configuration taskConfiguration; - - /** ClassLoader for all user code classes */ - private final ClassLoader userCodeClassLoader; - - /** Instance of the class to be run in this environment. */ - private final AbstractInvokable invokable; - - /** The memory manager of the current environment (currently the one associated with the executing TaskManager). */ - private final MemoryManager memoryManager; - - /** The I/O manager of the current environment (currently the one associated with the executing TaskManager). */ - private final IOManager ioManager; - - /** The input split provider that can be queried for new input splits. */ - private final InputSplitProvider inputSplitProvider; - - /** The thread executing the task in the environment. */ - private Thread executingThread; - - private final BroadcastVariableManager broadcastVariableManager; - - private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>(); - - private final AtomicBoolean canceled = new AtomicBoolean(); - - private final ResultPartition[] producedPartitions; - private final ResultPartitionWriter[] writers; - - private final SingleInputGate[] inputGates; - - private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>(); - - public RuntimeEnvironment( - ActorRef jobManager, Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader, - MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider, - BroadcastVariableManager broadcastVariableManager, NetworkEnvironment networkEnvironment) throws Exception { - - this.owner = checkNotNull(owner); - - this.memoryManager = checkNotNull(memoryManager); - this.ioManager = checkNotNull(ioManager); - this.inputSplitProvider = checkNotNull(inputSplitProvider); - this.jobManager = checkNotNull(jobManager); - - this.broadcastVariableManager = checkNotNull(broadcastVariableManager); - - try { - // Produced intermediate result partitions - final List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions(); - - this.producedPartitions = new ResultPartition[partitions.size()]; - this.writers = new ResultPartitionWriter[partitions.size()]; - - for (int i = 0; i < this.producedPartitions.length; i++) { - ResultPartitionDeploymentDescriptor desc = partitions.get(i); - ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId()); - - this.producedPartitions[i] = new ResultPartition( - this, - owner.getJobID(), - partitionId, - desc.getPartitionType(), - desc.getNumberOfSubpartitions(), - networkEnvironment.getPartitionManager(), - networkEnvironment.getPartitionConsumableNotifier(), - ioManager, - networkEnvironment.getDefaultIOMode()); - - writers[i] = new ResultPartitionWriter(this.producedPartitions[i]); - } - - // Consumed intermediate result partitions - final List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates(); - - this.inputGates = new SingleInputGate[consumedPartitions.size()]; - - for (int i = 0; i < inputGates.length; i++) { - inputGates[i] = SingleInputGate.create( - this, consumedPartitions.get(i), networkEnvironment); - - // The input gates are organized by key for task updates/channel updates at runtime - inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]); - } - - this.jobConfiguration = tdd.getJobConfiguration(); - this.taskConfiguration = tdd.getTaskConfiguration(); - - // ---------------------------------------------------------------- - // Invokable setup - // ---------------------------------------------------------------- - // Note: This has to be done *after* the readers and writers have - // been setup, because the invokable relies on them for I/O. - // ---------------------------------------------------------------- - - // Load and instantiate the invokable class - this.userCodeClassLoader = checkNotNull(userCodeClassLoader); - // Class of the task to run in this environment - Class<? extends AbstractInvokable> invokableClass; - try { - final String className = tdd.getInvokableClassName(); - invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class); - } - catch (Throwable t) { - throw new Exception("Could not load invokable class.", t); - } - - try { - this.invokable = invokableClass.newInstance(); - } - catch (Throwable t) { - throw new Exception("Could not instantiate the invokable class.", t); - } - - this.invokable.setEnvironment(this); - this.invokable.registerInputOutput(); - } - catch (Throwable t) { - throw new Exception("Error setting up runtime environment: " + t.getMessage(), t); - } - } - - /** - * Returns the task invokable instance. - */ - public AbstractInvokable getInvokable() { - return this.invokable; - } - - @Override - public JobID getJobID() { - return this.owner.getJobID(); - } - - @Override - public JobVertexID getJobVertexId() { - return this.owner.getVertexID(); - } - - @Override - public void run() { - // quick fail in case the task was cancelled while the thread was started - if (owner.isCanceledOrFailed()) { - owner.cancelingDone(); - return; - } - - try { - Thread.currentThread().setContextClassLoader(userCodeClassLoader); - invokable.invoke(); - - // Make sure, we enter the catch block when the task has been canceled - if (owner.isCanceledOrFailed()) { - throw new CancelTaskException("Task has been canceled or failed"); - } - - // Finish the produced partitions - if (producedPartitions != null) { - for (ResultPartition partition : producedPartitions) { - if (partition != null) { - partition.finish(); - } - } - } - - if (owner.isCanceledOrFailed()) { - throw new CancelTaskException(); - } - - // Finally, switch execution state to FINISHED and report to job manager - if (!owner.markAsFinished()) { - throw new Exception("Could *not* notify job manager that the task is finished."); - } - } - catch (Throwable t) { - if (!owner.isCanceledOrFailed()) { - // Perform clean up when the task failed and has been not canceled by the user - try { - invokable.cancel(); - } - catch (Throwable t2) { - LOG.error("Error while canceling the task", t2); - } - } - - // if we are already set as cancelled or failed (when failure is triggered externally), - // mark that the thread is done. - if (owner.isCanceledOrFailed() || t instanceof CancelTaskException) { - owner.cancelingDone(); - } - else { - // failure from inside the task thread. notify the task of the failure - owner.markFailed(t); - } - } - } - - /** - * Returns the thread, which is assigned to execute the user code. - */ - public Thread getExecutingThread() { - synchronized (this) { - if (executingThread == null) { - String name = owner.getTaskNameWithSubtasks(); - - if (LOG.isDebugEnabled()) { - name = name + " (" + owner.getExecutionId() + ")"; - } - - executingThread = new Thread(TASK_THREADS, this, name); - } - - return executingThread; - } - } - - public void cancelExecution() { - if (!canceled.compareAndSet(false, true)) { - return; - } - - LOG.info("Canceling {} ({}).", owner.getTaskNameWithSubtasks(), owner.getExecutionId()); - - // Request user code to shut down - if (invokable != null) { - try { - invokable.cancel(); - } - catch (Throwable e) { - LOG.error("Error while canceling the task.", e); - } - } - - final Thread executingThread = this.executingThread; - if (executingThread != null) { - // interrupt the running thread and wait for it to die - executingThread.interrupt(); - try { - executingThread.join(5000); - } - catch (InterruptedException e) { - } - if (!executingThread.isAlive()) { - return; - } - // Continuously interrupt the user thread until it changed to state CANCELED - while (executingThread != null && executingThread.isAlive()) { - LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt."); - if (LOG.isDebugEnabled()) { - StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n"); - StackTraceElement[] stack = executingThread.getStackTrace(); - for (StackTraceElement e : stack) { - bld.append(e).append('\n'); - } - LOG.debug(bld.toString()); - } - executingThread.interrupt(); - try { - executingThread.join(1000); - } - catch (InterruptedException e) { - } - } - } - } - - @Override - public ActorRef getJobManager() { - return jobManager; - } - - @Override - public IOManager getIOManager() { - return ioManager; - } - - @Override - public MemoryManager getMemoryManager() { - return memoryManager; - } - - @Override - public BroadcastVariableManager getBroadcastVariableManager() { - return broadcastVariableManager; - } - - @Override - public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) { - AccumulatorEvent evt; - try { - evt = new AccumulatorEvent(getJobID(), accumulators); - } - catch (IOException e) { - throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e); - } - - ReportAccumulatorResult accResult = new ReportAccumulatorResult(getJobID(), owner.getExecutionId(), evt); - jobManager.tell(accResult, ActorRef.noSender()); - } - - @Override - public ResultPartitionWriter getWriter(int index) { - checkElementIndex(index, writers.length, "Illegal environment writer request."); - - return writers[checkElementIndex(index, writers.length)]; - } - - @Override - public ResultPartitionWriter[] getAllWriters() { - return writers; - } - - @Override - public InputGate getInputGate(int index) { - checkElementIndex(index, inputGates.length); - - return inputGates[index]; - } - - @Override - public SingleInputGate[] getAllInputGates() { - return inputGates; - } - - public ResultPartition[] getProducedPartitions() { - return producedPartitions; - } - - public SingleInputGate getInputGateById(IntermediateDataSetID id) { - return inputGatesById.get(id); - } - - @Override - public Configuration getTaskConfiguration() { - return taskConfiguration; - } - - @Override - public Configuration getJobConfiguration() { - return jobConfiguration; - } - - @Override - public int getNumberOfSubtasks() { - return owner.getNumberOfSubtasks(); - } - - @Override - public int getIndexInSubtaskGroup() { - return owner.getSubtaskIndex(); - } - - @Override - public String getTaskName() { - return owner.getTaskName(); - } - - @Override - public InputSplitProvider getInputSplitProvider() { - return inputSplitProvider; - } - - @Override - public String getTaskNameWithSubtasks() { - return owner.getTaskNameWithSubtasks(); - } - - @Override - public ClassLoader getUserClassLoader() { - return userCodeClassLoader; - } - - public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> copyTasks) { - cacheCopyTasks.putAll(copyTasks); - } - - public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) { - cacheCopyTasks.put(name, copyTask); - } - - @Override - public Map<String, FutureTask<Path>> getCopyTask() { - return cacheCopyTasks; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index af55ebf..259ea55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -243,7 +243,7 @@ public class NetworkEnvironment { public void registerTask(Task task) throws IOException { final ResultPartition[] producedPartitions = task.getProducedPartitions(); - final ResultPartitionWriter[] writers = task.getWriters(); + final ResultPartitionWriter[] writers = task.getAllWriters(); if (writers.length != producedPartitions.length) { throw new IllegalStateException("Unequal number of writers and partitions."); @@ -288,7 +288,7 @@ public class NetworkEnvironment { } // Setup the buffer pool for each buffer reader - final SingleInputGate[] inputGates = task.getInputGates(); + final SingleInputGate[] inputGates = task.getAllInputGates(); for (SingleInputGate gate : inputGates) { BufferPool bufferPool = null; @@ -329,10 +329,9 @@ public class NetworkEnvironment { partitionManager.releasePartitionsProducedBy(executionId); } - ResultPartitionWriter[] writers = task.getWriters(); - + ResultPartitionWriter[] writers = task.getAllWriters(); if (writers != null) { - for (ResultPartitionWriter writer : task.getWriters()) { + for (ResultPartitionWriter writer : writers) { taskEventDispatcher.unregisterWriter(writer); } } @@ -344,7 +343,7 @@ public class NetworkEnvironment { } } - final SingleInputGate[] inputGates = task.getInputGates(); + final SingleInputGate[] inputGates = task.getAllInputGates(); if (inputGates != null) { for (SingleInputGate gate : inputGates) { http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index f06c8fb..df1f254 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; @@ -76,9 +75,8 @@ import static com.google.common.base.Preconditions.checkState; public class ResultPartition implements BufferPoolOwner { private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class); - - /** The owning environment. Mainly for debug purposes. */ - private final Environment owner; + + private final String owningTaskName; private final JobID jobId; @@ -120,7 +118,7 @@ public class ResultPartition implements BufferPoolOwner { private long totalNumberOfBytes; public ResultPartition( - Environment owner, + String owningTaskName, JobID jobId, ResultPartitionID partitionId, ResultPartitionType partitionType, @@ -130,7 +128,7 @@ public class ResultPartition implements BufferPoolOwner { IOManager ioManager, IOMode defaultIoMode) { - this.owner = checkNotNull(owner); + this.owningTaskName = checkNotNull(owningTaskName); this.jobId = checkNotNull(jobId); this.partitionId = checkNotNull(partitionId); this.partitionType = checkNotNull(partitionType); @@ -162,7 +160,7 @@ public class ResultPartition implements BufferPoolOwner { // Initially, partitions should be consumed once before release. pin(); - LOG.debug("{}: Initialized {}", owner.getTaskNameWithSubtasks(), this); + LOG.debug("{}: Initialized {}", owningTaskName, this); } /** @@ -281,7 +279,7 @@ public class ResultPartition implements BufferPoolOwner { */ public void release() { if (isReleased.compareAndSet(false, true)) { - LOG.debug("{}: Releasing {}.", owner.getTaskNameWithSubtasks(), this); + LOG.debug("{}: Releasing {}.", owningTaskName, this); // Release all subpartitions for (ResultSubpartition subpartition : subpartitions) { http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index b0d138a..acda1d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.event.task.TaskEvent; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; @@ -101,8 +100,8 @@ public class SingleInputGate implements InputGate { /** Lock object to guard partition requests and runtime channel updates. */ private final Object requestLock = new Object(); - /** The owning environment. Mainly for debug purposes. */ - private final Environment owner; + /** The name of the owning task, for logging purposes. */ + private final String owningTaskName; /** * The ID of the consumed intermediate result. Each input gate consumes partitions of the @@ -153,12 +152,12 @@ public class SingleInputGate implements InputGate { private int numberOfUninitializedChannels; public SingleInputGate( - Environment owner, + String owningTaskName, IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels) { - this.owner = checkNotNull(owner); + this.owningTaskName = checkNotNull(owningTaskName); this.consumedResultId = checkNotNull(consumedResultId); checkArgument(consumedSubpartitionIndex >= 0); @@ -265,7 +264,7 @@ public class SingleInputGate implements InputGate { synchronized (requestLock) { if (!isReleased) { try { - LOG.debug("{}: Releasing {}.", owner.getTaskNameWithSubtasks(), this); + LOG.debug("{}: Releasing {}.", owningTaskName, this); for (InputChannel inputChannel : inputChannels.values()) { try { @@ -410,7 +409,7 @@ public class SingleInputGate implements InputGate { * Creates an input gate and all of its input channels. */ public static SingleInputGate create( - Environment owner, + String owningTaskName, InputGateDeploymentDescriptor igdd, NetworkEnvironment networkEnvironment) { @@ -422,7 +421,7 @@ public class SingleInputGate implements InputGate { final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); final SingleInputGate inputGate = new SingleInputGate( - owner, consumedResultId, consumedSubpartitionIndex, icdd.length); + owningTaskName, consumedResultId, consumedSubpartitionIndex, icdd.length); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index b528f75..2bee094 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -1067,7 +1067,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { Environment env = getEnvironment(); return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), env.getCopyTask()); + env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), + env.getDistributedCacheEntries()); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java new file mode 100644 index 0000000..1321336 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import akka.actor.ActorRef; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorEvent; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Future; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; + +/** + * In implementation of the {@link Environment}. + */ +public class RuntimeEnvironment implements Environment { + + private final JobID jobId; + private final JobVertexID jobVertexId; + private final ExecutionAttemptID executionId; + + private final String taskName; + private final String taskNameWithSubtasks; + private final int subtaskIndex; + private final int parallelism; + + private final Configuration jobConfiguration; + private final Configuration taskConfiguration; + + private final ClassLoader userCodeClassLoader; + + private final MemoryManager memManager; + private final IOManager ioManager; + private final BroadcastVariableManager bcVarManager; + private final InputSplitProvider splitProvider; + + private final Map<String, Future<Path>> distCacheEntries; + + private final ResultPartitionWriter[] writers; + private final InputGate[] inputGates; + + private final ActorRef jobManagerActor; + + // ------------------------------------------------------------------------ + + public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId, + String taskName, String taskNameWithSubtasks, + int subtaskIndex, int parallelism, + Configuration jobConfiguration, Configuration taskConfiguration, + ClassLoader userCodeClassLoader, + MemoryManager memManager, IOManager ioManager, + BroadcastVariableManager bcVarManager, + InputSplitProvider splitProvider, + Map<String, Future<Path>> distCacheEntries, + ResultPartitionWriter[] writers, + InputGate[] inputGates, + ActorRef jobManagerActor) { + + checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism); + + this.jobId = checkNotNull(jobId); + this.jobVertexId = checkNotNull(jobVertexId); + this.executionId = checkNotNull(executionId); + this.taskName = checkNotNull(taskName); + this.taskNameWithSubtasks = checkNotNull(taskNameWithSubtasks); + this.subtaskIndex = subtaskIndex; + this.parallelism = parallelism; + this.jobConfiguration = checkNotNull(jobConfiguration); + this.taskConfiguration = checkNotNull(taskConfiguration); + this.userCodeClassLoader = checkNotNull(userCodeClassLoader); + this.memManager = checkNotNull(memManager); + this.ioManager = checkNotNull(ioManager); + this.bcVarManager = checkNotNull(bcVarManager); + this.splitProvider = checkNotNull(splitProvider); + this.distCacheEntries = checkNotNull(distCacheEntries); + this.writers = checkNotNull(writers); + this.inputGates = checkNotNull(inputGates); + this.jobManagerActor = checkNotNull(jobManagerActor); + } + + + // ------------------------------------------------------------------------ + + @Override + public JobID getJobID() { + return jobId; + } + + @Override + public JobVertexID getJobVertexId() { + return jobVertexId; + } + + @Override + public ExecutionAttemptID getExecutionId() { + return executionId; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public String getTaskNameWithSubtasks() { + return taskNameWithSubtasks; + } + + @Override + public int getNumberOfSubtasks() { + return parallelism; + } + + @Override + public int getIndexInSubtaskGroup() { + return subtaskIndex; + } + + @Override + public Configuration getJobConfiguration() { + return jobConfiguration; + } + + @Override + public Configuration getTaskConfiguration() { + return taskConfiguration; + } + + @Override + public ClassLoader getUserClassLoader() { + return userCodeClassLoader; + } + + @Override + public MemoryManager getMemoryManager() { + return memManager; + } + + @Override + public IOManager getIOManager() { + return ioManager; + } + + @Override + public BroadcastVariableManager getBroadcastVariableManager() { + return bcVarManager; + } + + @Override + public InputSplitProvider getInputSplitProvider() { + return splitProvider; + } + + @Override + public Map<String, Future<Path>> getDistributedCacheEntries() { + return distCacheEntries; + } + + @Override + public ResultPartitionWriter getWriter(int index) { + return writers[index]; + } + + @Override + public ResultPartitionWriter[] getAllWriters() { + return writers; + } + + @Override + public InputGate getInputGate(int index) { + return inputGates[index]; + } + + @Override + public InputGate[] getAllInputGates() { + return inputGates; + } + + @Override + public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) { + AccumulatorEvent evt; + try { + evt = new AccumulatorEvent(getJobID(), accumulators); + } + catch (IOException e) { + throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e); + } + + ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt); + jobManagerActor.tell(accResult, ActorRef.noSender()); + } + + @Override + public ActorRef getJobManager() { + return jobManagerActor; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index e6eee5b..f12344b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -19,382 +19,755 @@ package org.apache.flink.runtime.taskmanager; import akka.actor.ActorRef; +import akka.util.Timeout; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.CancelTaskException; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.execution.RuntimeEnvironment; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver; +import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.messages.ExecutionGraphMessages; -import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask; -import org.apache.flink.runtime.profiling.TaskManagerProfiler; -import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState; +import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError; +import org.apache.flink.runtime.state.StateHandle; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public class Task { +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Task represents one execution of a parallel subtask on a TaskManager. + * A Task wraps a Flink operator (which may be a user function) and + * runs it, providing all service necessary for example to consume input data, + * produce its results (intermediate result partitions) and communicate + * with the JobManager. + * + * <p>The Flink operators (implemented as subclasses of + * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only data + * readers, -writers, and certain event callbacks. The task connects those to the + * network stack and actor messages, and tracks the state of the execution and + * handles exceptions.</p> + * + * <p>Tasks have no knowledge about how they relate to other tasks, or whether they + * are the first attempt to execute the task, or a repeated attempt. All of that + * is only known to the JobManager. All the task knows are its own runnable code, + * the task's configuration, and the IDs of the intermediate results to consume and + * produce (if any).</p> + * + * <p>Each Task is run by one dedicated thread.</p> + */ +public class Task implements Runnable { + /** The class logger. */ + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + /** The tread group that contains all task threads */ + private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads"); + /** For atomic state updates */ private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState"); - /** The log object used for debugging. */ - private static final Logger LOG = LoggerFactory.getLogger(Task.class); - - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Constant fields that are part of the initial Task construction + // ------------------------------------------------------------------------ + /** The job that the task belongs to */ private final JobID jobId; + /** The vertex in the JobGraph whose code the task executes */ private final JobVertexID vertexId; + /** The execution attempt of the parallel subtask */ + private final ExecutionAttemptID executionId; + + /** The index of the parallel subtask, in [0, numberOfSubtasks) */ private final int subtaskIndex; - private final int numberOfSubtasks; - - private final ExecutionAttemptID executionId; + /** The number of parallel subtasks for the JobVertex/ExecutionJobVertex that this task belongs to */ + private final int parallelism; + /** The name of the task */ private final String taskName; + /** The name of the task, including the subtask index and the parallelism */ + private final String taskNameWithSubtask; + + /** The job-wide configuration object */ + private final Configuration jobConfiguration; + + /** The task-specific configuration */ + private final Configuration taskConfiguration; + + /** The jar files used by this task */ + private final List<BlobKey> requiredJarFiles; + + /** The name of the class that holds the invokable code */ + private final String nameOfInvokableClass; + + /** The handle to the state that the operator was initialized with */ + private final StateHandle operatorState; + + /** The memory manager to be used by this task */ + private final MemoryManager memoryManager; + + /** The I/O manager to be used by this task */ + private final IOManager ioManager; + + /** The BroadcastVariableManager to be used by this task */ + private final BroadcastVariableManager broadcastVariableManager; + + private final ResultPartition[] producedPartitions; + + private final ResultPartitionWriter[] writers; + + private final SingleInputGate[] inputGates; + + private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById; + + /** The TaskManager actor that spawned this task */ private final ActorRef taskManager; - private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>(); + /** The JobManager actor */ + private final ActorRef jobManager; + + /** All actors that want to be notified about changes in the task's execution state */ + private final List<ActorRef> executionListenerActors; + + /** The timeout for all ask operations on actors */ + private final Timeout actorAskTimeout; + + private final LibraryCacheManager libraryCache; + + private final FileCache fileCache; + + private final NetworkEnvironment network; - /** The environment (with the invokable) executed by this task */ - private volatile RuntimeEnvironment environment; + /** The thread that executes the task */ + private final Thread executingThread; + + // ------------------------------------------------------------------------ + // Fields that control the task execution + // ------------------------------------------------------------------------ + + private final AtomicBoolean invokableHasBeenCanceled = new AtomicBoolean(false); + + /** The invokable of this task, if initialized */ + private volatile AbstractInvokable invokable; + /** The current execution state of the task */ - private volatile ExecutionState executionState = ExecutionState.DEPLOYING; + private volatile ExecutionState executionState = ExecutionState.CREATED; + /** The observed exception, in case the task execution failed */ private volatile Throwable failureCause; - // -------------------------------------------------------------------------------------------- + + /** + * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to + * be undone in the case of a failing task deployment.</p> + */ + public Task(TaskDeploymentDescriptor tdd, + MemoryManager memManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + BroadcastVariableManager bcVarManager, + ActorRef taskManagerActor, + ActorRef jobManagerActor, + FiniteDuration actorAskTimeout, + LibraryCacheManager libraryCache, + FileCache fileCache) + { + checkArgument(tdd.getNumberOfSubtasks() > 0); + checkArgument(tdd.getIndexInSubtaskGroup() >= 0); + checkArgument(tdd.getIndexInSubtaskGroup() < tdd.getNumberOfSubtasks()); + + this.jobId = checkNotNull(tdd.getJobID()); + this.vertexId = checkNotNull(tdd.getVertexID()); + this.executionId = checkNotNull(tdd.getExecutionId()); + this.subtaskIndex = tdd.getIndexInSubtaskGroup(); + this.parallelism = tdd.getNumberOfSubtasks(); + this.taskName = checkNotNull(tdd.getTaskName()); + this.taskNameWithSubtask = getTaskNameWithSubtask(taskName, subtaskIndex, parallelism); + this.jobConfiguration = checkNotNull(tdd.getJobConfiguration()); + this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration()); + this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles()); + this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName()); + this.operatorState = tdd.getOperatorStates(); + + this.memoryManager = checkNotNull(memManager); + this.ioManager = checkNotNull(ioManager); + this.broadcastVariableManager =checkNotNull(bcVarManager); + + this.jobManager = checkNotNull(jobManagerActor); + this.taskManager = checkNotNull(taskManagerActor); + this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout)); + + this.libraryCache = checkNotNull(libraryCache); + this.fileCache = checkNotNull(fileCache); + this.network = checkNotNull(networkEnvironment); + + this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>(); + + // create the reader and writer structures + + final String taskNameWithSubtasksAndId = + Task.getTaskNameWithSubtaskAndID(taskName, subtaskIndex, parallelism, executionId); + + List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions(); + List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates(); + + // Produced intermediate result partitions + this.producedPartitions = new ResultPartition[partitions.size()]; + this.writers = new ResultPartitionWriter[partitions.size()]; + + for (int i = 0; i < this.producedPartitions.length; i++) { + ResultPartitionDeploymentDescriptor desc = partitions.get(i); + ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId); + + this.producedPartitions[i] = new ResultPartition( + taskNameWithSubtasksAndId, + jobId, + partitionId, + desc.getPartitionType(), + desc.getNumberOfSubpartitions(), + networkEnvironment.getPartitionManager(), + networkEnvironment.getPartitionConsumableNotifier(), + ioManager, + networkEnvironment.getDefaultIOMode()); + + this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]); + } + + // Consumed intermediate result partitions + this.inputGates = new SingleInputGate[consumedPartitions.size()]; + this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>(); - public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int parallelism, - ExecutionAttemptID executionId, String taskName, ActorRef taskManager) { + for (int i = 0; i < this.inputGates.length; i++) { + SingleInputGate gate = SingleInputGate.create( + taskNameWithSubtasksAndId, consumedPartitions.get(i), networkEnvironment); - this.jobId = jobId; - this.vertexId = vertexId; - this.subtaskIndex = taskIndex; - this.numberOfSubtasks = parallelism; - this.executionId = executionId; - this.taskName = taskName; - this.taskManager = taskManager; + this.inputGates[i] = gate; + inputGatesById.put(gate.getConsumedResultId(), gate); + } + + // finally, create the executing thread, but do not start it + executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); } - /** - * Returns the ID of the job this task belongs to. - */ + // ------------------------------------------------------------------------ + // Accessors + // ------------------------------------------------------------------------ + public JobID getJobID() { - return this.jobId; + return jobId; } - /** - * Returns the ID of this task vertex. - */ - public JobVertexID getVertexID() { - return this.vertexId; + public JobVertexID getJobVertexId() { + return vertexId; } - /** - * Gets the index of the parallel subtask [0, parallelism). - */ - public int getSubtaskIndex() { + public ExecutionAttemptID getExecutionId() { + return executionId; + } + + public int getIndexInSubtaskGroup() { return subtaskIndex; } - /** - * Gets the total number of subtasks of the task that this subtask belongs to. - */ public int getNumberOfSubtasks() { - return numberOfSubtasks; + return parallelism; } - /** - * Gets the ID of the execution attempt. - */ - public ExecutionAttemptID getExecutionId() { - return executionId; + public String getTaskName() { + return taskName; + } + + public String getTaskNameWithSubtasks() { + return taskNameWithSubtask; + } + + public Configuration getJobConfiguration() { + return jobConfiguration; + } + + public Configuration getTaskConfiguration() { + return this.taskConfiguration; + } + + public ResultPartitionWriter[] getAllWriters() { + return writers; + } + + public SingleInputGate[] getAllInputGates() { + return inputGates; + } + + public ResultPartition[] getProducedPartitions() { + return producedPartitions; + } + + public SingleInputGate getInputGateById(IntermediateDataSetID id) { + return inputGatesById.get(id); + } + + public Thread getExecutingThread() { + return executingThread; } + // ------------------------------------------------------------------------ + // Task Execution + // ------------------------------------------------------------------------ + /** * Returns the current execution state of the task. + * @return The current execution state of the task. */ public ExecutionState getExecutionState() { return this.executionState; } - public void setEnvironment(RuntimeEnvironment environment) { - this.environment = environment; - } - - public RuntimeEnvironment getEnvironment() { - return environment; - } - + /** + * Checks whether the task has failed, is canceled, or is being canceled at the moment. + * @return True is the task in state FAILED, CANCELING, or CANCELED, false otherwise. + */ public boolean isCanceledOrFailed() { return executionState == ExecutionState.CANCELING || executionState == ExecutionState.CANCELED || executionState == ExecutionState.FAILED; } - public String getTaskName() { - if (LOG.isDebugEnabled()) { - return taskName + " (" + executionId + ")"; - } else { - return taskName; - } - } - - public String getTaskNameWithSubtasks() { - if (LOG.isDebugEnabled()) { - return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + - ") (" + executionId + ")"; - } else { - return this.taskName + " (" + (this.subtaskIndex + 1) + "/" + this.numberOfSubtasks + ")"; - } - } - + /** + * If the task has failed, this method gets the exception that caused this task to fail. + * Otherwise this method returns null. + * + * @return The exception that caused the task to fail, or null, if the task has not failed. + */ public Throwable getFailureCause() { return failureCause; } - // ---------------------------------------------------------------------------------------------------------------- - // States and Transitions - // ---------------------------------------------------------------------------------------------------------------- - /** - * Marks the task as finished. This succeeds, if the task was previously in the state - * "RUNNING", otherwise it fails. Failure indicates that the task was either - * canceled, or set to failed. - * - * @return True, if the task correctly enters the state FINISHED. + * Starts the task's thread. */ - public boolean markAsFinished() { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) { - notifyObservers(ExecutionState.FINISHED, null); - unregisterTask(); - return true; - } - else { - return false; - } + public void startTaskThread() { + executingThread.start(); } - public void markFailed(Throwable error) { + /** + * The core work method that bootstraps the task and executes it code + */ + public void run() { + + // ---------------------------- + // Initial State transition + // ---------------------------- while (true) { ExecutionState current = this.executionState; - - // if canceled, fine. we are done, and the jobmanager has been told - if (current == ExecutionState.CANCELED) { + if (current == ExecutionState.CREATED) { + if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) { + // success, we can start our work + break; + } + } + else if (current == ExecutionState.FAILED) { + // we were immediately failed. tell the TaskManager that we reached our final state + notifyFinalState(); return; } + else if (current == ExecutionState.CANCELING) { + if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) { + // we were immediately canceled. tell the TaskManager that we reached our final state + notifyFinalState(); + return; + } + } + else { + throw new IllegalStateException("Invalid state for beginning of task operation"); + } + } - // if canceling, we are done, but we cannot be sure that the jobmanager has been told. - // after all, we may have recognized our failure state before the cancelling and never sent a canceled - // message back - else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - this.failureCause = error; + // all resource acquisitions and registrations from here on + // need to be undone in the end - notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error)); - unregisterTask(); + Map<String, Future<Path>> distributedCacheEntries = new HashMap<String, Future<Path>>(); - return; - } - } - } + AbstractInvokable invokable = null; - public void cancelExecution() { - while (true) { - ExecutionState current = this.executionState; + try { + // ---------------------------- + // Task Bootstrap - We periodically + // check for canceling as a shortcut + // ---------------------------- - // if the task is already canceled (or canceling) or finished or failed, - // then we need not do anything - if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED || - current == ExecutionState.CANCELING || current == ExecutionState.FAILED) { - return; - } + // first of all, get a user-code classloader + // this may involve downloading the job's JAR files and/or classes + LOG.info("Loading JAR files for task " + taskNameWithSubtask); + final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache); - if (current == ExecutionState.DEPLOYING) { - // directly set to canceled - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + // now load the task's invokable code + invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); - notifyObservers(ExecutionState.CANCELED, null); - unregisterTask(); - return; - } + if (isCanceledOrFailed()) { + throw new CancelTaskException(); } - else if (current == ExecutionState.RUNNING) { - // go to canceling and perform the actual task canceling - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELING)) { - - notifyObservers(ExecutionState.CANCELING, null); - try { - this.environment.cancelExecution(); - } - catch (Throwable e) { - LOG.error("Error while cancelling the task.", e); - } - return; + // ---------------------------------------------------------------- + // register the task with the network stack + // this operation may fail if the system does not have enough + // memory to run the necessary data exchanges + // the registration must also strictly be undone + // ---------------------------------------------------------------- + + LOG.info("Registering task at network: " + this); + network.registerTask(this); + + // next, kick off the background copying of files for the distributed cache + try { + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : + DistributedCache.readFileInfoFromConfig(jobConfiguration)) + { + LOG.info("Obtaining local cache file for '" + entry.getKey() + '\''); + Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId); + distributedCacheEntries.put(entry.getKey(), cp); } } - else { - throw new RuntimeException("unexpected state for cancelling: " + current); + catch (Exception e) { + throw new Exception("Exception while adding files to distributed cache.", e); } - } - } - /** - * Sets the tasks to be cancelled and reports a failure back to the master. - */ - public void failExternally(Throwable cause) { - while (true) { - ExecutionState current = this.executionState; - - // if the task is already canceled (or canceling) or finished or failed, - // then we need not do anything - if (current == ExecutionState.CANCELED || current == ExecutionState.CANCELING || current == ExecutionState.FAILED) { - return; + if (isCanceledOrFailed()) { + throw new CancelTaskException(); } - if (current == ExecutionState.FINISHED) { - // Set state to failed in order to correctly unregister task from network environment - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - notifyObservers(ExecutionState.FAILED, null); + // ---------------------------------------------------------------- + // call the user code initialization methods + // ---------------------------------------------------------------- - return; + TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager, + jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout); + + Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, + taskName, taskNameWithSubtask, subtaskIndex, parallelism, + jobConfiguration, taskConfiguration, + userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, + splitProvider, distributedCacheEntries, + writers, inputGates, jobManager); + + // let the task code create its readers and writers + invokable.setEnvironment(env); + try { + invokable.registerInputOutput(); + } + catch (Exception e) { + throw new Exception("Call to registerInputOutput() of invokable failed", e); + } + + // the very last thing before the actual execution starts running is to inject + // the state into the task. the state is non-empty if this is an execution + // of a task that failed but had backuped state from a checkpoint + if (operatorState != null) { + if (invokable instanceof OperatorStateCarrier) { + ((OperatorStateCarrier) invokable).injectState(operatorState); + } + else { + throw new IllegalStateException("Found operator state for a non-stateful task invokable"); } } - if (current == ExecutionState.DEPLOYING) { - // directly set to canceled - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - this.failureCause = cause; + // ---------------------------------------------------------------- + // actual task core work + // ---------------------------------------------------------------- - notifyObservers(ExecutionState.FAILED, null); - unregisterTask(); - return; + // we must make strictly sure that the invokable is accessible to teh cancel() call + // by the time we switched to running. + this.invokable = invokable; + + // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime + if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { + throw new CancelTaskException(); + } + + // notify everyone that we switched to running. especially the TaskManager needs + // to know this! + notifyObservers(ExecutionState.RUNNING, null); + taskManager.tell(new TaskMessages.UpdateTaskExecutionState( + new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)), ActorRef.noSender()); + + // make sure the user code classloader is accessible thread-locally + executingThread.setContextClassLoader(userCodeClassLoader); + + // run the invokable + invokable.invoke(); + + // make sure, we enter the catch block if the task leaves the invoke() method due + // to the fact that it has been canceled + if (isCanceledOrFailed()) { + throw new CancelTaskException(); + } + + // ---------------------------------------------------------------- + // finalization of a successful execution + // ---------------------------------------------------------------- + + // finish the produced partitions. if this fails, we consider the execution failed. + for (ResultPartition partition : producedPartitions) { + if (partition != null) { + partition.finish(); } } - else if (current == ExecutionState.RUNNING) { - // go to canceling and perform the actual task canceling - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - try { - this.environment.cancelExecution(); + + // try to mark the task as finished + // if that fails, the task was canceled/failed in the meantime + if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) { + notifyObservers(ExecutionState.FINISHED, null); + } + else { + throw new CancelTaskException(); + } + } + catch (Throwable t) { + + // ---------------------------------------------------------------- + // the execution failed. either the invokable code properly failed, or + // an exception was thrown as a side effect of cancelling + // ---------------------------------------------------------------- + + try { + // transition into our final state. we should be either in RUNNING, CANCELING, or FAILED + // loop for multiple retries during concurrent state changes via calls to cancel() or + // to failExternally() + while (true) { + ExecutionState current = this.executionState; + if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { + if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { + // proper failure of the task. record the exception as the root cause + failureCause = t; + notifyObservers(ExecutionState.FAILED, t); + + // in case of an exception during execution, we still call "cancel()" on the task + if (invokable != null && this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { + try { + invokable.cancel(); + } + catch (Throwable t2) { + LOG.error("Error while canceling task " + taskNameWithSubtask, t2); + } + } + break; + } } - catch (Throwable e) { - LOG.error("Error while cancelling the task.", e); + else if (current == ExecutionState.CANCELING) { + if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + notifyObservers(ExecutionState.CANCELED, null); + break; + } } + else if (current == ExecutionState.FAILED) { + // in state failed already, no transition necessary any more + break; + } + // unexpected state, go to failed + else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { + LOG.error("Unexpected state in Task during an exception: " + current); + break; + } + // else fall through the loop and + } + } + catch (Throwable tt) { + String message = "FATAL - exception in task exception handler"; + LOG.error(message, tt); + notifyFatalError(message, tt); + } + } + finally { + try { + LOG.info("Freeing task resources for " + taskNameWithSubtask); + + // free the network resources + network.unregisterTask(this); + + if (invokable != null) { + memoryManager.releaseAll(invokable); + } - this.failureCause = cause; + // remove all of the tasks library resources + libraryCache.unregisterTask(jobId, executionId); - notifyObservers(ExecutionState.FAILED, null); - unregisterTask(); + // remove all files in the distributed cache + removeCachedFiles(distributedCacheEntries, fileCache); - return; - } + notifyFinalState(); } - else { - throw new RuntimeException("unexpected state for failing the task: " + current); + catch (Throwable t) { + // an error in the resource cleanup is fatal + String message = "FATAL - exception in task resource cleanup"; + LOG.error(message, t); + notifyFatalError(message, t); } } } - public void cancelingDone() { - while (true) { - ExecutionState current = this.executionState; + private ClassLoader createUserCodeClassloader(LibraryCacheManager libraryCache) throws Exception { + long startDownloadTime = System.currentTimeMillis(); - if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) { - return; - } - if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) { - LOG.error(String.format("Unexpected state transition in Task: %s -> %s", current, ExecutionState.CANCELED)); - } + // triggers the download of all missing jar files from the job manager + libraryCache.registerTask(jobId, executionId, requiredJarFiles); - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { - notifyObservers(ExecutionState.CANCELED, null); - unregisterTask(); - return; - } + LOG.debug("Register task {} at library cache manager took {} milliseconds", + executionId, System.currentTimeMillis() - startDownloadTime); + + ClassLoader userCodeClassLoader = libraryCache.getClassLoader(jobId); + if (userCodeClassLoader == null) { + throw new Exception("No user code classloader available."); } + return userCodeClassLoader; } - /** - * Starts the execution of this task. - */ - public boolean startExecution() { - LOG.info("Starting execution of task {}", this.getTaskName()); - if (STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { - final Thread thread = this.environment.getExecutingThread(); - thread.start(); - return true; + private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception { + Class<? extends AbstractInvokable> invokableClass; + try { + invokableClass = Class.forName(className, true, classLoader) + .asSubclass(AbstractInvokable.class); } - else { - return false; + catch (Throwable t) { + throw new Exception("Could not load the task's invokable class.", t); + } + try { + return invokableClass.newInstance(); + } + catch (Throwable t) { + throw new Exception("Could not instantiate the task's invokable class.", t); } } - /** - * Unregisters the task from the central memory manager. - */ - public void unregisterMemoryManager(MemoryManager memoryManager) { - RuntimeEnvironment env = this.environment; - if (memoryManager != null && env != null) { - memoryManager.releaseAll(env.getInvokable()); + private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) { + // cancel and release all distributed cache files + try { + for (Map.Entry<String, Future<Path>> entry : entries.entrySet()) { + String name = entry.getKey(); + try { + fileCache.deleteTmpFile(name, jobId); + } + catch (Exception e) { + // unpleasant, but we continue + LOG.error("Distributed Cache could not remove cached file registered under '" + + name + "'.", e); + } + } + } + catch (Throwable t) { + LOG.error("Error while removing cached local files from distributed cache."); } } - protected void unregisterTask() { - taskManager.tell(new UnregisterTask(executionId), ActorRef.noSender()); + private void notifyFinalState() { + taskManager.tell(new TaskInFinalState(executionId), ActorRef.noSender()); } - // ----------------------------------------------------------------------------------------------------------------- - // Task Profiling - // ----------------------------------------------------------------------------------------------------------------- + private void notifyFatalError(String message, Throwable cause) { + taskManager.tell(new FatalError(message, cause), ActorRef.noSender()); + } - /** - * Registers the task manager profiler with the task. - */ - public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) { - taskManagerProfiler.registerTask(this, jobConfiguration); + // ---------------------------------------------------------------------------------------------------------------- + // Canceling / Failing the task from the outside + // ---------------------------------------------------------------------------------------------------------------- + + public void cancelExecution() { + LOG.info("Attempting to cancel task " + taskNameWithSubtask); + if (cancelOrFailAndCancelInvokable(ExecutionState.CANCELING)) { + notifyObservers(ExecutionState.CANCELING, null); + } } /** - * Unregisters the task from the task manager profiler. + * Sets the tasks to be cancelled and reports a failure back to the master. */ - public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) { - if (taskManagerProfiler != null) { - taskManagerProfiler.unregisterTask(this.executionId); + public void failExternally(Throwable cause) { + LOG.info("Attempting to fail task externally " + taskNameWithSubtask); + if (cancelOrFailAndCancelInvokable(ExecutionState.FAILED)) { + failureCause = cause; + notifyObservers(ExecutionState.FAILED, cause); } } - // ------------------------------------------------------------------------ - // Intermediate result partitions - // ------------------------------------------------------------------------ - - public SingleInputGate[] getInputGates() { - return environment != null ? environment.getAllInputGates() : null; - } + private boolean cancelOrFailAndCancelInvokable(ExecutionState targetState) { + while (true) { + ExecutionState current = this.executionState; - public ResultPartitionWriter[] getWriters() { - return environment != null ? environment.getAllWriters() : null; - } + // if the task is already canceled (or canceling) or finished or failed, + // then we need not do anything + if (current.isTerminal() || current == ExecutionState.CANCELING) { + return false; + } - public ResultPartition[] getProducedPartitions() { - return environment != null ? environment.getProducedPartitions() : null; + if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) { + if (STATE_UPDATER.compareAndSet(this, current, targetState)) { + // if we manage this state transition, then the invokable gets never called + // we need not call cancel on it + return true; + } + } + else if (current == ExecutionState.RUNNING) { + if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) { + // we are canceling / failing out of the running state + // we need to cancel the invokable + if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { + LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId); + + // because the canceling may block on user code, we cancel from a separate thread + Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask); + Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, + "Canceler for " + taskNameWithSubtask); + cancelThread.start(); + } + return true; + } + } + else { + throw new IllegalStateException("Unexpected task state: " + current); + } + } } - // -------------------------------------------------------------------------------------------- - // State Listeners - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // State Listeners + // ------------------------------------------------------------------------ public void registerExecutionListener(ActorRef listener) { executionListenerActors.add(listener); @@ -404,25 +777,146 @@ public class Task { executionListenerActors.remove(listener); } - private void notifyObservers(ExecutionState newState, String message) { - if (LOG.isInfoEnabled()) { - LOG.info(getTaskNameWithSubtasks() + " switched to " + newState + (message == null ? "" : " : " + message)); + private void notifyObservers(ExecutionState newState, Throwable error) { + if (error == null) { + LOG.info(taskNameWithSubtask + " switched to " + newState); + } + else { + LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error); } + TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error); + TaskMessages.UpdateTaskExecutionState actorMessage = new + TaskMessages.UpdateTaskExecutionState(stateUpdate); + for (ActorRef listener : executionListenerActors) { - listener.tell(new ExecutionGraphMessages.ExecutionStateChanged( - jobId, vertexId, taskName, numberOfSubtasks, subtaskIndex, - executionId, newState, System.currentTimeMillis(), message), - ActorRef.noSender()); + listener.tell(actorMessage, ActorRef.noSender()); } } - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Notifications on the invokable + // ------------------------------------------------------------------------ + + public void triggerCheckpointBarrier(final long checkpointID) { + AbstractInvokable invokabe = this.invokable; + + if (executionState == ExecutionState.RUNNING && invokabe != null) { + if (invokabe instanceof BarrierTransceiver) { + final BarrierTransceiver barrierTransceiver = (BarrierTransceiver) invokabe; + final Logger logger = LOG; + + Thread caller = new Thread("Barrier emitter") { + @Override + public void run() { + try { + barrierTransceiver.broadcastBarrierFromSource(checkpointID); + } + catch (Throwable t) { + logger.error("Error while triggering checkpoint barriers", t); + } + } + }; + caller.setDaemon(true); + caller.start(); + } + else { + LOG.error("Task received a checkpoint request, but is not a checkpointing task - " + + taskNameWithSubtask); + } + } + else { + LOG.debug("Ignoring request to trigger a checkpoint barrier"); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ @Override public String toString() { return getTaskNameWithSubtasks() + " [" + executionState + ']'; } + + // ------------------------------------------------------------------------ + // Task Names + // ------------------------------------------------------------------------ + + public static String getTaskNameWithSubtask(String name, int subtask, int numSubtasks) { + return name + " (" + (subtask+1) + '/' + numSubtasks + ')'; + } + + public static String getTaskNameWithSubtaskAndID(String name, int subtask, int numSubtasks, ExecutionAttemptID id) { + return name + " (" + (subtask+1) + '/' + numSubtasks + ") (" + id + ')'; + } + + /** + * This runner calls cancel() on the invokable and periodically interrupts the + * thread until it has terminated. + */ + private static class TaskCanceler implements Runnable { + + private final Logger logger; + private final AbstractInvokable invokable; + private final Thread executer; + private final String taskName; + + public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String taskName) { + this.logger = logger; + this.invokable = invokable; + this.executer = executer; + this.taskName = taskName; + } + + @Override + public void run() { + try { + // the user-defined cancel method may throw errors. + // we need do continue despite that + try { + invokable.cancel(); + } + catch (Throwable t) { + logger.error("Error while canceling the task", t); + } + + // interrupt the running thread initially + executer.interrupt(); + try { + executer.join(10000); + } + catch (InterruptedException e) { + // we can ignore this + } + + // it is possible that the user code does not react immediately. for that + // reason, we spawn a separate thread that repeatedly interrupts the user code until + // it exits + while (executer.isAlive()) { + + // build the stack trace of where the thread is stuck, for the log + StringBuilder bld = new StringBuilder(); + StackTraceElement[] stack = executer.getStackTrace(); + for (StackTraceElement e : stack) { + bld.append(e).append('\n'); + } + + logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", + taskName, bld.toString()); + + executer.interrupt(); + try { + executer.join(5000); + } + catch (InterruptedException e) { + // we can ignore this + } + } + } + catch (Throwable t) { + logger.error("Error in the task canceler", t); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index c81830c..b12f1b5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -24,7 +24,15 @@ import org.apache.flink.runtime.instance.InstanceID * Miscellaneous actor messages exchanged with the TaskManager. */ object TaskManagerMessages { - + + /** + * This message informs the TaskManager about a fatal error that prevents + * it from continuing. + * + * @param description The description of the problem + */ + case class FatalError(description: String, cause: Throwable) + /** * Tells the task manager to send a heartbeat message to the job manager. */ @@ -49,7 +57,7 @@ object TaskManagerMessages { // -------------------------------------------------------------------------- - // Utility messages used for notifications during TaskManager startup + // Reporting the current TaskManager stack trace // -------------------------------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala index c8c5726..b1a08ca 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala @@ -67,12 +67,12 @@ object TaskMessages { extends TaskMessage /** - * Unregister the task identified by [[executionID]] from the TaskManager. - * Sent to the TaskManager by futures and callbacks. + * Notifies the TaskManager that the task has reached its final state, + * either FINISHED, CANCELED, or FAILED. * * @param executionID The task's execution attempt ID. */ - case class UnregisterTask(executionID: ExecutionAttemptID) + case class TaskInFinalState(executionID: ExecutionAttemptID) extends TaskMessage