[FLINK-1672] [runtime] Unify Task and RuntimeEnvironment into one class.

 - This simplifies and hardens the failure handling during task startup
 - Guarantees that no actor system threads are blocked by task bootstrap, or 
task canceling
 - Corrects some previously erroneous corner case state transitions
 - Adds simple and robust tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e613014
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e613014
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e613014

Branch: refs/heads/master
Commit: 8e61301452218e6d279b013beb7bbd02a7c2e3f9
Parents: 1d368a4
Author: Stephan Ewen <se...@apache.org>
Authored: Sun May 3 04:41:03 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon May 11 21:13:41 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/execution/Environment.java    |   3 +
 .../flink/runtime/execution/ExecutionState.java |   6 +-
 .../runtime/execution/RuntimeEnvironment.java   | 458 ---------
 .../runtime/io/network/NetworkEnvironment.java  |  11 +-
 .../io/network/partition/ResultPartition.java   |  14 +-
 .../partition/consumer/SingleInputGate.java     |  15 +-
 .../runtime/operators/RegularPactTask.java      |   3 +-
 .../runtime/taskmanager/RuntimeEnvironment.java | 230 +++++
 .../apache/flink/runtime/taskmanager/Task.java  | 990 ++++++++++++++-----
 .../runtime/messages/TaskManagerMessages.scala  |  12 +-
 .../flink/runtime/messages/TaskMessages.scala   |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 321 ++----
 .../consumer/LocalInputChannelTest.java         |   7 +-
 .../partition/consumer/SingleInputGateTest.java |   6 +-
 .../partition/consumer/TestSingleInputGate.java |   4 +-
 .../partition/consumer/UnionInputGateTest.java  |   9 +-
 .../operators/testutils/MockEnvironment.java    |   6 +
 .../runtime/taskmanager/ForwardingActor.java    |  41 +
 .../taskmanager/TaskExecutionStateTest.java     |  33 +
 .../runtime/taskmanager/TaskManagerTest.java    | 152 ++-
 .../flink/runtime/taskmanager/TaskTest.java     | 874 ++++++++++++----
 .../testingUtils/TestingTaskManager.scala       |   6 +-
 22 files changed, 1972 insertions(+), 1235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 7ab3bc9..081e3ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.execution;
 
+import akka.actor.ActorRef;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -159,4 +160,6 @@ public interface Environment {
 
        InputGate[] getAllInputGates();
 
+       // this should go away
+       ActorRef getJobManager();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index 2fcaea1..9f4a5a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -35,10 +35,10 @@ package org.apache.flink.runtime.execution;
  *                                               ... -> FAILED
  * </pre>
  *
- * It is possible to enter the {@code FAILED} state from any other state.
+ * <p>It is possible to enter the {@code FAILED} state from any other 
state.</p>
  *
- * The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
- * considered terminal states.
+ * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
+ * considered terminal states.</p>
  */
 public enum ExecutionState {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
deleted file mode 100644
index 081d4bc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-import akka.actor.ActorRef;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static com.google.common.base.Preconditions.checkElementIndex;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-public class RuntimeEnvironment implements Environment, Runnable {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeEnvironment.class);
-
-       private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task 
Threads");
-
-       /** The ActorRef to the job manager */
-       private final ActorRef jobManager;
-
-       /** The task that owns this environment */
-       private final Task owner;
-
-       /** The job configuration encapsulated in the environment object. */
-       private final Configuration jobConfiguration;
-
-       /** The task configuration encapsulated in the environment object. */
-       private final Configuration taskConfiguration;
-
-       /** ClassLoader for all user code classes */
-       private final ClassLoader userCodeClassLoader;
-
-       /** Instance of the class to be run in this environment. */
-       private final AbstractInvokable invokable;
-
-       /** The memory manager of the current environment (currently the one 
associated with the executing TaskManager). */
-       private final MemoryManager memoryManager;
-
-       /** The I/O manager of the current environment (currently the one 
associated with the executing TaskManager). */
-       private final IOManager ioManager;
-
-       /** The input split provider that can be queried for new input splits. 
*/
-       private final InputSplitProvider inputSplitProvider;
-
-       /** The thread executing the task in the environment. */
-       private Thread executingThread;
-
-       private final BroadcastVariableManager broadcastVariableManager;
-
-       private final Map<String, FutureTask<Path>> cacheCopyTasks = new 
HashMap<String, FutureTask<Path>>();
-
-       private final AtomicBoolean canceled = new AtomicBoolean();
-
-       private final ResultPartition[] producedPartitions;
-       private final ResultPartitionWriter[] writers;
-
-       private final SingleInputGate[] inputGates;
-
-       private final Map<IntermediateDataSetID, SingleInputGate> 
inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
-
-       public RuntimeEnvironment(
-                       ActorRef jobManager, Task owner, 
TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader,
-                       MemoryManager memoryManager, IOManager ioManager, 
InputSplitProvider inputSplitProvider,
-                       BroadcastVariableManager broadcastVariableManager, 
NetworkEnvironment networkEnvironment) throws Exception {
-
-               this.owner = checkNotNull(owner);
-
-               this.memoryManager = checkNotNull(memoryManager);
-               this.ioManager = checkNotNull(ioManager);
-               this.inputSplitProvider = checkNotNull(inputSplitProvider);
-               this.jobManager = checkNotNull(jobManager);
-
-               this.broadcastVariableManager = 
checkNotNull(broadcastVariableManager);
-
-               try {
-                       // Produced intermediate result partitions
-                       final List<ResultPartitionDeploymentDescriptor> 
partitions = tdd.getProducedPartitions();
-
-                       this.producedPartitions = new 
ResultPartition[partitions.size()];
-                       this.writers = new 
ResultPartitionWriter[partitions.size()];
-
-                       for (int i = 0; i < this.producedPartitions.length; 
i++) {
-                               ResultPartitionDeploymentDescriptor desc = 
partitions.get(i);
-                               ResultPartitionID partitionId = new 
ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
-
-                               this.producedPartitions[i] = new 
ResultPartition(
-                                               this,
-                                               owner.getJobID(),
-                                               partitionId,
-                                               desc.getPartitionType(),
-                                               desc.getNumberOfSubpartitions(),
-                                               
networkEnvironment.getPartitionManager(),
-                                               
networkEnvironment.getPartitionConsumableNotifier(),
-                                               ioManager,
-                                               
networkEnvironment.getDefaultIOMode());
-
-                               writers[i] = new 
ResultPartitionWriter(this.producedPartitions[i]);
-                       }
-
-                       // Consumed intermediate result partitions
-                       final List<InputGateDeploymentDescriptor> 
consumedPartitions = tdd.getInputGates();
-
-                       this.inputGates = new 
SingleInputGate[consumedPartitions.size()];
-
-                       for (int i = 0; i < inputGates.length; i++) {
-                               inputGates[i] = SingleInputGate.create(
-                                               this, 
consumedPartitions.get(i), networkEnvironment);
-
-                               // The input gates are organized by key for 
task updates/channel updates at runtime
-                               
inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]);
-                       }
-
-                       this.jobConfiguration = tdd.getJobConfiguration();
-                       this.taskConfiguration = tdd.getTaskConfiguration();
-
-                       // 
----------------------------------------------------------------
-                       // Invokable setup
-                       // 
----------------------------------------------------------------
-                       // Note: This has to be done *after* the readers and 
writers have
-                       // been setup, because the invokable relies on them for 
I/O.
-                       // 
----------------------------------------------------------------
-
-                       // Load and instantiate the invokable class
-                       this.userCodeClassLoader = 
checkNotNull(userCodeClassLoader);
-                       // Class of the task to run in this environment
-                       Class<? extends AbstractInvokable> invokableClass;
-                       try {
-                               final String className = 
tdd.getInvokableClassName();
-                               invokableClass = Class.forName(className, true, 
userCodeClassLoader).asSubclass(AbstractInvokable.class);
-                       }
-                       catch (Throwable t) {
-                               throw new Exception("Could not load invokable 
class.", t);
-                       }
-
-                       try {
-                               this.invokable = invokableClass.newInstance();
-                       }
-                       catch (Throwable t) {
-                               throw new Exception("Could not instantiate the 
invokable class.", t);
-                       }
-
-                       this.invokable.setEnvironment(this);
-                       this.invokable.registerInputOutput();
-               }
-               catch (Throwable t) {
-                       throw new Exception("Error setting up runtime 
environment: " + t.getMessage(), t);
-               }
-       }
-
-       /**
-        * Returns the task invokable instance.
-        */
-       public AbstractInvokable getInvokable() {
-               return this.invokable;
-       }
-
-       @Override
-       public JobID getJobID() {
-               return this.owner.getJobID();
-       }
-
-       @Override
-       public JobVertexID getJobVertexId() {
-               return this.owner.getVertexID();
-       }
-
-       @Override
-       public void run() {
-               // quick fail in case the task was cancelled while the thread 
was started
-               if (owner.isCanceledOrFailed()) {
-                       owner.cancelingDone();
-                       return;
-               }
-
-               try {
-                       
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
-                       invokable.invoke();
-
-                       // Make sure, we enter the catch block when the task 
has been canceled
-                       if (owner.isCanceledOrFailed()) {
-                               throw new CancelTaskException("Task has been 
canceled or failed");
-                       }
-
-                       // Finish the produced partitions
-                       if (producedPartitions != null) {
-                               for (ResultPartition partition : 
producedPartitions) {
-                                       if (partition != null) {
-                                               partition.finish();
-                                       }
-                               }
-                       }
-
-                       if (owner.isCanceledOrFailed()) {
-                               throw new CancelTaskException();
-                       }
-
-                       // Finally, switch execution state to FINISHED and 
report to job manager
-                       if (!owner.markAsFinished()) {
-                               throw new Exception("Could *not* notify job 
manager that the task is finished.");
-                       }
-               }
-               catch (Throwable t) {
-                       if (!owner.isCanceledOrFailed()) {
-                               // Perform clean up when the task failed and 
has been not canceled by the user
-                               try {
-                                       invokable.cancel();
-                               }
-                               catch (Throwable t2) {
-                                       LOG.error("Error while canceling the 
task", t2);
-                               }
-                       }
-
-                       // if we are already set as cancelled or failed (when 
failure is triggered externally),
-                       // mark that the thread is done.
-                       if (owner.isCanceledOrFailed() || t instanceof 
CancelTaskException) {
-                               owner.cancelingDone();
-                       }
-                       else {
-                               // failure from inside the task thread. notify 
the task of the failure
-                               owner.markFailed(t);
-                       }
-               }
-       }
-
-       /**
-        * Returns the thread, which is assigned to execute the user code.
-        */
-       public Thread getExecutingThread() {
-               synchronized (this) {
-                       if (executingThread == null) {
-                               String name = owner.getTaskNameWithSubtasks();
-
-                               if (LOG.isDebugEnabled()) {
-                                       name = name + " (" + 
owner.getExecutionId() + ")";
-                               }
-
-                               executingThread = new Thread(TASK_THREADS, 
this, name);
-                       }
-
-                       return executingThread;
-               }
-       }
-
-       public void cancelExecution() {
-               if (!canceled.compareAndSet(false, true)) {
-                       return;
-               }
-
-               LOG.info("Canceling {} ({}).", owner.getTaskNameWithSubtasks(), 
owner.getExecutionId());
-
-               // Request user code to shut down
-               if (invokable != null) {
-                       try {
-                               invokable.cancel();
-                       }
-                       catch (Throwable e) {
-                               LOG.error("Error while canceling the task.", e);
-                       }
-               }
-
-               final Thread executingThread = this.executingThread;
-               if (executingThread != null) {
-                       // interrupt the running thread and wait for it to die
-                       executingThread.interrupt();
-                       try {
-                               executingThread.join(5000);
-                       }
-                       catch (InterruptedException e) {
-                       }
-                       if (!executingThread.isAlive()) {
-                               return;
-                       }
-                       // Continuously interrupt the user thread until it 
changed to state CANCELED
-                       while (executingThread != null && 
executingThread.isAlive()) {
-                               LOG.warn("Task " + 
owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending 
repeated interrupt.");
-                               if (LOG.isDebugEnabled()) {
-                                       StringBuilder bld = new 
StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is 
stuck in method:\n");
-                                       StackTraceElement[] stack = 
executingThread.getStackTrace();
-                                       for (StackTraceElement e : stack) {
-                                               bld.append(e).append('\n');
-                                       }
-                                       LOG.debug(bld.toString());
-                               }
-                               executingThread.interrupt();
-                               try {
-                                       executingThread.join(1000);
-                               }
-                               catch (InterruptedException e) {
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public ActorRef getJobManager() {
-               return jobManager;
-       }
-
-       @Override
-       public IOManager getIOManager() {
-               return ioManager;
-       }
-
-       @Override
-       public MemoryManager getMemoryManager() {
-               return memoryManager;
-       }
-
-       @Override
-       public BroadcastVariableManager getBroadcastVariableManager() {
-               return broadcastVariableManager;
-       }
-
-       @Override
-       public void reportAccumulators(Map<String, Accumulator<?, ?>> 
accumulators) {
-               AccumulatorEvent evt;
-               try {
-                       evt = new AccumulatorEvent(getJobID(), accumulators);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Cannot serialize 
accumulators to send them to JobManager", e);
-               }
-
-               ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(getJobID(), owner.getExecutionId(), evt);
-               jobManager.tell(accResult, ActorRef.noSender());
-       }
-
-       @Override
-       public ResultPartitionWriter getWriter(int index) {
-               checkElementIndex(index, writers.length, "Illegal environment 
writer request.");
-
-               return writers[checkElementIndex(index, writers.length)];
-       }
-
-       @Override
-       public ResultPartitionWriter[] getAllWriters() {
-               return writers;
-       }
-
-       @Override
-       public InputGate getInputGate(int index) {
-               checkElementIndex(index, inputGates.length);
-
-               return inputGates[index];
-       }
-
-       @Override
-       public SingleInputGate[] getAllInputGates() {
-               return inputGates;
-       }
-
-       public ResultPartition[] getProducedPartitions() {
-               return producedPartitions;
-       }
-
-       public SingleInputGate getInputGateById(IntermediateDataSetID id) {
-               return inputGatesById.get(id);
-       }
-
-       @Override
-       public Configuration getTaskConfiguration() {
-               return taskConfiguration;
-       }
-
-       @Override
-       public Configuration getJobConfiguration() {
-               return jobConfiguration;
-       }
-
-       @Override
-       public int getNumberOfSubtasks() {
-               return owner.getNumberOfSubtasks();
-       }
-
-       @Override
-       public int getIndexInSubtaskGroup() {
-               return owner.getSubtaskIndex();
-       }
-
-       @Override
-       public String getTaskName() {
-               return owner.getTaskName();
-       }
-
-       @Override
-       public InputSplitProvider getInputSplitProvider() {
-               return inputSplitProvider;
-       }
-
-       @Override
-       public String getTaskNameWithSubtasks() {
-               return owner.getTaskNameWithSubtasks();
-       }
-
-       @Override
-       public ClassLoader getUserClassLoader() {
-               return userCodeClassLoader;
-       }
-
-       public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> 
copyTasks) {
-               cacheCopyTasks.putAll(copyTasks);
-       }
-
-       public void addCopyTaskForCacheFile(String name, FutureTask<Path> 
copyTask) {
-               cacheCopyTasks.put(name, copyTask);
-       }
-
-       @Override
-       public Map<String, FutureTask<Path>> getCopyTask() {
-               return cacheCopyTasks;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index af55ebf..259ea55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -243,7 +243,7 @@ public class NetworkEnvironment {
 
        public void registerTask(Task task) throws IOException {
                final ResultPartition[] producedPartitions = 
task.getProducedPartitions();
-               final ResultPartitionWriter[] writers = task.getWriters();
+               final ResultPartitionWriter[] writers = task.getAllWriters();
 
                if (writers.length != producedPartitions.length) {
                        throw new IllegalStateException("Unequal number of 
writers and partitions.");
@@ -288,7 +288,7 @@ public class NetworkEnvironment {
                        }
 
                        // Setup the buffer pool for each buffer reader
-                       final SingleInputGate[] inputGates = 
task.getInputGates();
+                       final SingleInputGate[] inputGates = 
task.getAllInputGates();
 
                        for (SingleInputGate gate : inputGates) {
                                BufferPool bufferPool = null;
@@ -329,10 +329,9 @@ public class NetworkEnvironment {
                                
partitionManager.releasePartitionsProducedBy(executionId);
                        }
 
-                       ResultPartitionWriter[] writers = task.getWriters();
-
+                       ResultPartitionWriter[] writers = task.getAllWriters();
                        if (writers != null) {
-                               for (ResultPartitionWriter writer : 
task.getWriters()) {
+                               for (ResultPartitionWriter writer : writers) {
                                        
taskEventDispatcher.unregisterWriter(writer);
                                }
                        }
@@ -344,7 +343,7 @@ public class NetworkEnvironment {
                                }
                        }
 
-                       final SingleInputGate[] inputGates = 
task.getInputGates();
+                       final SingleInputGate[] inputGates = 
task.getAllInputGates();
 
                        if (inputGates != null) {
                                for (SingleInputGate gate : inputGates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index f06c8fb..df1f254 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -76,9 +75,8 @@ import static com.google.common.base.Preconditions.checkState;
 public class ResultPartition implements BufferPoolOwner {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartition.class);
-
-       /** The owning environment. Mainly for debug purposes. */
-       private final Environment owner;
+       
+       private final String owningTaskName;
 
        private final JobID jobId;
 
@@ -120,7 +118,7 @@ public class ResultPartition implements BufferPoolOwner {
        private long totalNumberOfBytes;
 
        public ResultPartition(
-                       Environment owner,
+                       String owningTaskName,
                        JobID jobId,
                        ResultPartitionID partitionId,
                        ResultPartitionType partitionType,
@@ -130,7 +128,7 @@ public class ResultPartition implements BufferPoolOwner {
                        IOManager ioManager,
                        IOMode defaultIoMode) {
 
-               this.owner = checkNotNull(owner);
+               this.owningTaskName = checkNotNull(owningTaskName);
                this.jobId = checkNotNull(jobId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
@@ -162,7 +160,7 @@ public class ResultPartition implements BufferPoolOwner {
                // Initially, partitions should be consumed once before release.
                pin();
 
-               LOG.debug("{}: Initialized {}", 
owner.getTaskNameWithSubtasks(), this);
+               LOG.debug("{}: Initialized {}", owningTaskName, this);
        }
 
        /**
@@ -281,7 +279,7 @@ public class ResultPartition implements BufferPoolOwner {
         */
        public void release() {
                if (isReleased.compareAndSet(false, true)) {
-                       LOG.debug("{}: Releasing {}.", 
owner.getTaskNameWithSubtasks(), this);
+                       LOG.debug("{}: Releasing {}.", owningTaskName, this);
 
                        // Release all subpartitions
                        for (ResultSubpartition subpartition : subpartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b0d138a..acda1d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -101,8 +100,8 @@ public class SingleInputGate implements InputGate {
        /** Lock object to guard partition requests and runtime channel 
updates. */
        private final Object requestLock = new Object();
 
-       /** The owning environment. Mainly for debug purposes. */
-       private final Environment owner;
+       /** The name of the owning task, for logging purposes. */
+       private final String owningTaskName;
 
        /**
         * The ID of the consumed intermediate result. Each input gate consumes 
partitions of the
@@ -153,12 +152,12 @@ public class SingleInputGate implements InputGate {
        private int numberOfUninitializedChannels;
 
        public SingleInputGate(
-                       Environment owner,
+                       String owningTaskName,
                        IntermediateDataSetID consumedResultId,
                        int consumedSubpartitionIndex,
                        int numberOfInputChannels) {
 
-               this.owner = checkNotNull(owner);
+               this.owningTaskName = checkNotNull(owningTaskName);
                this.consumedResultId = checkNotNull(consumedResultId);
 
                checkArgument(consumedSubpartitionIndex >= 0);
@@ -265,7 +264,7 @@ public class SingleInputGate implements InputGate {
                synchronized (requestLock) {
                        if (!isReleased) {
                                try {
-                                       LOG.debug("{}: Releasing {}.", 
owner.getTaskNameWithSubtasks(), this);
+                                       LOG.debug("{}: Releasing {}.", 
owningTaskName, this);
 
                                        for (InputChannel inputChannel : 
inputChannels.values()) {
                                                try {
@@ -410,7 +409,7 @@ public class SingleInputGate implements InputGate {
         * Creates an input gate and all of its input channels.
         */
        public static SingleInputGate create(
-                       Environment owner,
+                       String owningTaskName,
                        InputGateDeploymentDescriptor igdd,
                        NetworkEnvironment networkEnvironment) {
 
@@ -422,7 +421,7 @@ public class SingleInputGate implements InputGate {
                final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
                final SingleInputGate inputGate = new SingleInputGate(
-                               owner, consumedResultId, 
consumedSubpartitionIndex, icdd.length);
+                               owningTaskName, consumedResultId, 
consumedSubpartitionIndex, icdd.length);
 
                // Create the input channels. There is one input channel for 
each consumed partition.
                final InputChannel[] inputChannels = new 
InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index b528f75..2bee094 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1067,7 +1067,8 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
        public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
                Environment env = getEnvironment();
                return new DistributedRuntimeUDFContext(taskName, 
env.getNumberOfSubtasks(),
-                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(), env.getCopyTask());
+                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(),
+                               env.getDistributedCacheEntries());
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
new file mode 100644
index 0000000..1321336
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * In implementation of the {@link Environment}.
+ */
+public class RuntimeEnvironment implements Environment {
+       
+       private final JobID jobId;
+       private final JobVertexID jobVertexId;
+       private final ExecutionAttemptID executionId;
+       
+       private final String taskName;
+       private final String taskNameWithSubtasks;
+       private final int subtaskIndex;
+       private final int parallelism;
+       
+       private final Configuration jobConfiguration;
+       private final Configuration taskConfiguration;
+       
+       private final ClassLoader userCodeClassLoader;
+
+       private final MemoryManager memManager;
+       private final IOManager ioManager;
+       private final BroadcastVariableManager bcVarManager;
+       private final InputSplitProvider splitProvider;
+       
+       private final Map<String, Future<Path>> distCacheEntries;
+
+       private final ResultPartitionWriter[] writers;
+       private final InputGate[] inputGates;
+       
+       private final ActorRef jobManagerActor;
+       
+       // 
------------------------------------------------------------------------
+
+       public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, 
ExecutionAttemptID executionId,
+                                                               String 
taskName, String taskNameWithSubtasks,
+                                                               int 
subtaskIndex, int parallelism,
+                                                               Configuration 
jobConfiguration, Configuration taskConfiguration,
+                                                               ClassLoader 
userCodeClassLoader,
+                                                               MemoryManager 
memManager, IOManager ioManager,
+                                                               
BroadcastVariableManager bcVarManager,
+                                                               
InputSplitProvider splitProvider,
+                                                               Map<String, 
Future<Path>> distCacheEntries,
+                                                               
ResultPartitionWriter[] writers,
+                                                               InputGate[] 
inputGates,
+                                                               ActorRef 
jobManagerActor) {
+               
+               checkArgument(parallelism > 0 && subtaskIndex >= 0 && 
subtaskIndex < parallelism);
+               
+               this.jobId = checkNotNull(jobId);
+               this.jobVertexId = checkNotNull(jobVertexId);
+               this.executionId = checkNotNull(executionId);
+               this.taskName = checkNotNull(taskName);
+               this.taskNameWithSubtasks = checkNotNull(taskNameWithSubtasks);
+               this.subtaskIndex = subtaskIndex;
+               this.parallelism = parallelism;
+               this.jobConfiguration = checkNotNull(jobConfiguration);
+               this.taskConfiguration = checkNotNull(taskConfiguration);
+               this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+               this.memManager = checkNotNull(memManager);
+               this.ioManager = checkNotNull(ioManager);
+               this.bcVarManager = checkNotNull(bcVarManager);
+               this.splitProvider = checkNotNull(splitProvider);
+               this.distCacheEntries = checkNotNull(distCacheEntries);
+               this.writers = checkNotNull(writers);
+               this.inputGates = checkNotNull(inputGates);
+               this.jobManagerActor = checkNotNull(jobManagerActor);
+       }
+
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public JobID getJobID() {
+               return jobId;
+       }
+
+       @Override
+       public JobVertexID getJobVertexId() {
+               return jobVertexId;
+       }
+
+       @Override
+       public ExecutionAttemptID getExecutionId() {
+               return executionId;
+       }
+
+       @Override
+       public String getTaskName() {
+               return taskName;
+       }
+
+       @Override
+       public String getTaskNameWithSubtasks() {
+               return taskNameWithSubtasks;
+       }
+
+       @Override
+       public int getNumberOfSubtasks() {
+               return parallelism;
+       }
+
+       @Override
+       public int getIndexInSubtaskGroup() {
+               return subtaskIndex;
+       }
+
+       @Override
+       public Configuration getJobConfiguration() {
+               return jobConfiguration;
+       }
+
+       @Override
+       public Configuration getTaskConfiguration() {
+               return taskConfiguration;
+       }
+       
+       @Override
+       public ClassLoader getUserClassLoader() {
+               return userCodeClassLoader;
+       }
+
+       @Override
+       public MemoryManager getMemoryManager() {
+               return memManager;
+       }
+
+       @Override
+       public IOManager getIOManager() {
+               return ioManager;
+       }
+
+       @Override
+       public BroadcastVariableManager getBroadcastVariableManager() {
+               return bcVarManager;
+       }
+
+       @Override
+       public InputSplitProvider getInputSplitProvider() {
+               return splitProvider;
+       }
+
+       @Override
+       public Map<String, Future<Path>> getDistributedCacheEntries() {
+               return distCacheEntries;
+       }
+
+       @Override
+       public ResultPartitionWriter getWriter(int index) {
+               return writers[index];
+       }
+
+       @Override
+       public ResultPartitionWriter[] getAllWriters() {
+               return writers;
+       }
+
+       @Override
+       public InputGate getInputGate(int index) {
+               return inputGates[index];
+       }
+
+       @Override
+       public InputGate[] getAllInputGates() {
+               return inputGates;
+       }
+
+       @Override
+       public void reportAccumulators(Map<String, Accumulator<?, ?>> 
accumulators) {
+               AccumulatorEvent evt;
+               try {
+                       evt = new AccumulatorEvent(getJobID(), accumulators);
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Cannot serialize 
accumulators to send them to JobManager", e);
+               }
+
+               ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(jobId, executionId, evt);
+               jobManagerActor.tell(accResult, ActorRef.noSender());
+       }
+
+       @Override
+       public ActorRef getJobManager() {
+               return jobManagerActor;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e6eee5b..f12344b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -19,382 +19,755 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
+import akka.util.Timeout;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
-import org.apache.flink.runtime.profiling.TaskManagerProfiler;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
+import org.apache.flink.runtime.state.StateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public class Task {
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Task represents one execution of a parallel subtask on a TaskManager.
+ * A Task wraps a Flink operator (which may be a user function) and
+ * runs it, providing all service necessary for example to consume input data,
+ * produce its results (intermediate result partitions) and communicate
+ * with the JobManager.
+ *
+ * <p>The Flink operators (implemented as subclasses of
+ * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only 
data
+ * readers, -writers, and certain event callbacks. The task connects those to 
the
+ * network stack and actor messages, and tracks the state of the execution and
+ * handles exceptions.</p>
+ *
+ * <p>Tasks have no knowledge about how they relate to other tasks, or whether 
they
+ * are the first attempt to execute the task, or a repeated attempt. All of 
that
+ * is only known to the JobManager. All the task knows are its own runnable 
code,
+ * the task's configuration, and the IDs of the intermediate results to 
consume and
+ * produce (if any).</p>
+ *
+ * <p>Each Task is run by one dedicated thread.</p>
+ */
+public class Task implements Runnable {
 
+       /** The class logger. */
+       private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+       
+       /** The tread group that contains all task threads */
+       private static final ThreadGroup TASK_THREADS_GROUP = new 
ThreadGroup("Flink Task Threads");
+       
        /** For atomic state updates */
        private static final AtomicReferenceFieldUpdater<Task, ExecutionState> 
STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Task.class, 
ExecutionState.class, "executionState");
 
-       /** The log object used for debugging. */
-       private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  Constant fields that are part of the initial Task construction
+       // 
------------------------------------------------------------------------
 
+       /** The job that the task belongs to */
        private final JobID jobId;
 
+       /** The vertex in the JobGraph whose code the task executes */
        private final JobVertexID vertexId;
 
+       /** The execution attempt of the parallel subtask */
+       private final ExecutionAttemptID executionId;
+
+       /** The index of the parallel subtask, in [0, numberOfSubtasks) */
        private final int subtaskIndex;
 
-       private final int numberOfSubtasks;
-
-       private final ExecutionAttemptID executionId;
+       /** The number of parallel subtasks for the 
JobVertex/ExecutionJobVertex that this task belongs to */
+       private final int parallelism;
 
+       /** The name of the task */
        private final String taskName;
 
+       /** The name of the task, including the subtask index and the 
parallelism */
+       private final String taskNameWithSubtask;
+
+       /** The job-wide configuration object */
+       private final Configuration jobConfiguration;
+
+       /** The task-specific configuration */
+       private final Configuration taskConfiguration;
+
+       /** The jar files used by this task */
+       private final List<BlobKey> requiredJarFiles;
+
+       /** The name of the class that holds the invokable code */
+       private final String nameOfInvokableClass;
+
+       /** The handle to the state that the operator was initialized with */
+       private final StateHandle operatorState;
+
+       /** The memory manager to be used by this task */
+       private final MemoryManager memoryManager;
+
+       /** The I/O manager to be used by this task */
+       private final IOManager ioManager;
+
+       /** The BroadcastVariableManager to be used by this task */
+       private final BroadcastVariableManager broadcastVariableManager;
+
+       private final ResultPartition[] producedPartitions;
+
+       private final ResultPartitionWriter[] writers;
+
+       private final SingleInputGate[] inputGates;
+
+       private final Map<IntermediateDataSetID, SingleInputGate> 
inputGatesById;
+
+       /** The TaskManager actor that spawned this task */
        private final ActorRef taskManager;
 
-       private final List<ActorRef> executionListenerActors = new 
CopyOnWriteArrayList<ActorRef>();
+       /** The JobManager actor */
+       private final ActorRef jobManager;
+
+       /** All actors that want to be notified about changes in the task's 
execution state */
+       private final List<ActorRef> executionListenerActors;
+
+       /** The timeout for all ask operations on actors */
+       private final Timeout actorAskTimeout;
+
+       private final LibraryCacheManager libraryCache;
+       
+       private final FileCache fileCache;
+       
+       private final NetworkEnvironment network;
 
-       /** The environment (with the invokable) executed by this task */
-       private volatile RuntimeEnvironment environment;
+       /** The thread that executes the task */
+       private final Thread executingThread;
+       
 
+       // 
------------------------------------------------------------------------
+       //  Fields that control the task execution
+       // 
------------------------------------------------------------------------
+
+       private final AtomicBoolean invokableHasBeenCanceled = new 
AtomicBoolean(false);
+       
+       /** The invokable of this task, if initialized */
+       private volatile AbstractInvokable invokable;
+       
        /** The current execution state of the task */
-       private volatile ExecutionState executionState = 
ExecutionState.DEPLOYING;
+       private volatile ExecutionState executionState = ExecutionState.CREATED;
 
+       /** The observed exception, in case the task execution failed */
        private volatile Throwable failureCause;
 
-       // 
--------------------------------------------------------------------------------------------
 
+       
+       /**
+        * <p><b>IMPORTANT:</b> This constructor may not start any work that 
would need to 
+        * be undone in the case of a failing task deployment.</p>
+        */
+       public Task(TaskDeploymentDescriptor tdd,
+                               MemoryManager memManager,
+                               IOManager ioManager,
+                               NetworkEnvironment networkEnvironment,
+                               BroadcastVariableManager bcVarManager,
+                               ActorRef taskManagerActor,
+                               ActorRef jobManagerActor,
+                               FiniteDuration actorAskTimeout,
+                               LibraryCacheManager libraryCache,
+                               FileCache fileCache)
+       {
+               checkArgument(tdd.getNumberOfSubtasks() > 0);
+               checkArgument(tdd.getIndexInSubtaskGroup() >= 0);
+               checkArgument(tdd.getIndexInSubtaskGroup() < 
tdd.getNumberOfSubtasks());
+
+               this.jobId = checkNotNull(tdd.getJobID());
+               this.vertexId = checkNotNull(tdd.getVertexID());
+               this.executionId  = checkNotNull(tdd.getExecutionId());
+               this.subtaskIndex = tdd.getIndexInSubtaskGroup();
+               this.parallelism = tdd.getNumberOfSubtasks();
+               this.taskName = checkNotNull(tdd.getTaskName());
+               this.taskNameWithSubtask = getTaskNameWithSubtask(taskName, 
subtaskIndex, parallelism);
+               this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
+               this.taskConfiguration = 
checkNotNull(tdd.getTaskConfiguration());
+               this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
+               this.nameOfInvokableClass = 
checkNotNull(tdd.getInvokableClassName());
+               this.operatorState = tdd.getOperatorStates();
+
+               this.memoryManager = checkNotNull(memManager);
+               this.ioManager = checkNotNull(ioManager);
+               this.broadcastVariableManager =checkNotNull(bcVarManager);
+
+               this.jobManager = checkNotNull(jobManagerActor);
+               this.taskManager = checkNotNull(taskManagerActor);
+               this.actorAskTimeout = new 
Timeout(checkNotNull(actorAskTimeout));
+               
+               this.libraryCache = checkNotNull(libraryCache);
+               this.fileCache = checkNotNull(fileCache);
+               this.network = checkNotNull(networkEnvironment);
+
+               this.executionListenerActors = new 
CopyOnWriteArrayList<ActorRef>();
+
+               // create the reader and writer structures
+
+               final String taskNameWithSubtasksAndId =
+                               Task.getTaskNameWithSubtaskAndID(taskName, 
subtaskIndex, parallelism, executionId);
+
+               List<ResultPartitionDeploymentDescriptor> partitions = 
tdd.getProducedPartitions();
+               List<InputGateDeploymentDescriptor> consumedPartitions = 
tdd.getInputGates();
+
+               // Produced intermediate result partitions
+               this.producedPartitions = new 
ResultPartition[partitions.size()];
+               this.writers = new ResultPartitionWriter[partitions.size()];
+
+               for (int i = 0; i < this.producedPartitions.length; i++) {
+                       ResultPartitionDeploymentDescriptor desc = 
partitions.get(i);
+                       ResultPartitionID partitionId = new 
ResultPartitionID(desc.getPartitionId(), executionId);
+
+                       this.producedPartitions[i] = new ResultPartition(
+                                       taskNameWithSubtasksAndId,
+                                       jobId,
+                                       partitionId,
+                                       desc.getPartitionType(),
+                                       desc.getNumberOfSubpartitions(),
+                                       
networkEnvironment.getPartitionManager(),
+                                       
networkEnvironment.getPartitionConsumableNotifier(),
+                                       ioManager,
+                                       networkEnvironment.getDefaultIOMode());
+
+                       this.writers[i] = new 
ResultPartitionWriter(this.producedPartitions[i]);
+               }
+
+               // Consumed intermediate result partitions
+               this.inputGates = new 
SingleInputGate[consumedPartitions.size()];
+               this.inputGatesById = new HashMap<IntermediateDataSetID, 
SingleInputGate>();
 
-       public Task(JobID jobId, JobVertexID vertexId, int taskIndex, int 
parallelism,
-                       ExecutionAttemptID executionId, String taskName, 
ActorRef taskManager) {
+               for (int i = 0; i < this.inputGates.length; i++) {
+                       SingleInputGate gate = SingleInputGate.create(
+                                       taskNameWithSubtasksAndId, 
consumedPartitions.get(i), networkEnvironment);
 
-               this.jobId = jobId;
-               this.vertexId = vertexId;
-               this.subtaskIndex = taskIndex;
-               this.numberOfSubtasks = parallelism;
-               this.executionId = executionId;
-               this.taskName = taskName;
-               this.taskManager = taskManager;
+                       this.inputGates[i] = gate;
+                       inputGatesById.put(gate.getConsumedResultId(), gate);
+               }
+               
+               // finally, create the executing thread, but do not start it
+               executingThread = new Thread(TASK_THREADS_GROUP, this, 
taskNameWithSubtask); 
        }
 
-       /**
-        * Returns the ID of the job this task belongs to.
-        */
+       // 
------------------------------------------------------------------------
+       //  Accessors
+       // 
------------------------------------------------------------------------
+
        public JobID getJobID() {
-               return this.jobId;
+               return jobId;
        }
 
-       /**
-        * Returns the ID of this task vertex.
-        */
-       public JobVertexID getVertexID() {
-               return this.vertexId;
+       public JobVertexID getJobVertexId() {
+               return vertexId;
        }
 
-       /**
-        * Gets the index of the parallel subtask [0, parallelism).
-        */
-       public int getSubtaskIndex() {
+       public ExecutionAttemptID getExecutionId() {
+               return executionId;
+       }
+
+       public int getIndexInSubtaskGroup() {
                return subtaskIndex;
        }
 
-       /**
-        * Gets the total number of subtasks of the task that this subtask 
belongs to.
-        */
        public int getNumberOfSubtasks() {
-               return numberOfSubtasks;
+               return parallelism;
        }
 
-       /**
-        * Gets the ID of the execution attempt.
-        */
-       public ExecutionAttemptID getExecutionId() {
-               return executionId;
+       public String getTaskName() {
+               return taskName;
+       }
+
+       public String getTaskNameWithSubtasks() {
+               return taskNameWithSubtask;
+       }
+
+       public Configuration getJobConfiguration() {
+               return jobConfiguration;
+       }
+       
+       public Configuration getTaskConfiguration() {
+               return this.taskConfiguration;
+       }
+       
+       public ResultPartitionWriter[] getAllWriters() {
+               return writers;
+       }
+       
+       public SingleInputGate[] getAllInputGates() {
+               return inputGates;
+       }
+
+       public ResultPartition[] getProducedPartitions() {
+               return producedPartitions;
+       }
+
+       public SingleInputGate getInputGateById(IntermediateDataSetID id) {
+               return inputGatesById.get(id);
+       }
+
+       public Thread getExecutingThread() {
+               return executingThread;
        }
 
+       // 
------------------------------------------------------------------------
+       //  Task Execution
+       // 
------------------------------------------------------------------------
+
        /**
         * Returns the current execution state of the task.
+        * @return The current execution state of the task.
         */
        public ExecutionState getExecutionState() {
                return this.executionState;
        }
 
-       public void setEnvironment(RuntimeEnvironment environment) {
-               this.environment = environment;
-       }
-
-       public RuntimeEnvironment getEnvironment() {
-               return environment;
-       }
-
+       /**
+        * Checks whether the task has failed, is canceled, or is being 
canceled at the moment.
+        * @return True is the task in state FAILED, CANCELING, or CANCELED, 
false otherwise.
+        */
        public boolean isCanceledOrFailed() {
                return executionState == ExecutionState.CANCELING ||
                                executionState == ExecutionState.CANCELED ||
                                executionState == ExecutionState.FAILED;
        }
 
-       public String getTaskName() {
-               if (LOG.isDebugEnabled()) {
-                       return taskName + " (" + executionId + ")";
-               } else {
-                       return taskName;
-               }
-       }
-
-       public String getTaskNameWithSubtasks() {
-               if (LOG.isDebugEnabled()) {
-                       return this.taskName + " (" + (this.subtaskIndex + 1) + 
"/" + this.numberOfSubtasks +
-                                       ") (" + executionId + ")";
-               } else {
-                       return this.taskName + " (" + (this.subtaskIndex + 1) + 
"/" + this.numberOfSubtasks + ")";
-               }
-       }
-
+       /**
+        * If the task has failed, this method gets the exception that caused 
this task to fail.
+        * Otherwise this method returns null.
+        *
+        * @return The exception that caused the task to fail, or null, if the 
task has not failed.
+        */
        public Throwable getFailureCause() {
                return failureCause;
        }
 
-       // 
----------------------------------------------------------------------------------------------------------------
-       //  States and Transitions
-       // 
----------------------------------------------------------------------------------------------------------------
-
        /**
-        * Marks the task as finished. This succeeds, if the task was 
previously in the state
-        * "RUNNING", otherwise it fails. Failure indicates that the task was 
either
-        * canceled, or set to failed.
-        *
-        * @return True, if the task correctly enters the state FINISHED.
+        * Starts the task's thread.
         */
-       public boolean markAsFinished() {
-               if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
-                       notifyObservers(ExecutionState.FINISHED, null);
-                       unregisterTask();
-                       return true;
-               }
-               else {
-                       return false;
-               }
+       public void startTaskThread() {
+               executingThread.start();
        }
 
-       public void markFailed(Throwable error) {
+       /**
+        * The core work method that bootstraps the task and executes it code
+        */
+       public void run() {
+
+               // ----------------------------
+               //  Initial State transition
+               // ----------------------------
                while (true) {
                        ExecutionState current = this.executionState;
-
-                       // if canceled, fine. we are done, and the jobmanager 
has been told
-                       if (current == ExecutionState.CANCELED) {
+                       if (current == ExecutionState.CREATED) {
+                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+                                       // success, we can start our work
+                                       break;
+                               }
+                       }
+                       else if (current == ExecutionState.FAILED) {
+                               // we were immediately failed. tell the 
TaskManager that we reached our final state
+                               notifyFinalState();
                                return;
                        }
+                       else if (current == ExecutionState.CANCELING) {
+                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+                                       // we were immediately canceled. tell 
the TaskManager that we reached our final state
+                                       notifyFinalState();
+                                       return;
+                               }
+                       }
+                       else {
+                               throw new IllegalStateException("Invalid state 
for beginning of task operation");
+                       }
+               }
 
-                       // if canceling, we are done, but we cannot be sure 
that the jobmanager has been told.
-                       // after all, we may have recognized our failure state 
before the cancelling and never sent a canceled
-                       // message back
-                       else if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.FAILED)) {
-                               this.failureCause = error;
+               // all resource acquisitions and registrations from here on
+               // need to be undone in the end
 
-                               notifyObservers(ExecutionState.FAILED, 
ExceptionUtils.stringifyException(error));
-                               unregisterTask();
+               Map<String, Future<Path>> distributedCacheEntries = new 
HashMap<String, Future<Path>>();
 
-                               return;
-                       }
-               }
-       }
+               AbstractInvokable invokable = null;
 
-       public void cancelExecution() {
-               while (true) {
-                       ExecutionState current = this.executionState;
+               try {
+                       // ----------------------------
+                       //  Task Bootstrap - We periodically 
+                       //  check for canceling as a shortcut
+                       // ----------------------------
 
-                       // if the task is already canceled (or canceling) or 
finished or failed,
-                       // then we need not do anything
-                       if (current == ExecutionState.FINISHED || current == 
ExecutionState.CANCELED ||
-                                       current == ExecutionState.CANCELING || 
current == ExecutionState.FAILED) {
-                               return;
-                       }
+                       // first of all, get a user-code classloader
+                       // this may involve downloading the job's JAR files 
and/or classes
+                       LOG.info("Loading JAR files for task " + 
taskNameWithSubtask);
+                       final ClassLoader userCodeClassLoader = 
createUserCodeClassloader(libraryCache);
 
-                       if (current == ExecutionState.DEPLOYING) {
-                               // directly set to canceled
-                               if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.CANCELED)) {
+                       // now load the task's invokable code
+                       invokable = 
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
 
-                                       
notifyObservers(ExecutionState.CANCELED, null);
-                                       unregisterTask();
-                                       return;
-                               }
+                       if (isCanceledOrFailed()) {
+                               throw new CancelTaskException();
                        }
-                       else if (current == ExecutionState.RUNNING) {
-                               // go to canceling and perform the actual task 
canceling
-                               if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.CANCELING)) {
-
-                                       
notifyObservers(ExecutionState.CANCELING, null);
-                                       try {
-                                               
this.environment.cancelExecution();
-                                       }
-                                       catch (Throwable e) {
-                                               LOG.error("Error while 
cancelling the task.", e);
-                                       }
 
-                                       return;
+                       // 
----------------------------------------------------------------
+                       // register the task with the network stack
+                       // this operation may fail if the system does not have 
enough
+                       // memory to run the necessary data exchanges
+                       // the registration must also strictly be undone
+                       // 
----------------------------------------------------------------
+
+                       LOG.info("Registering task at network: " + this);
+                       network.registerTask(this);
+
+                       // next, kick off the background copying of files for 
the distributed cache
+                       try {
+                               for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> entry :
+                                               
DistributedCache.readFileInfoFromConfig(jobConfiguration))
+                               {
+                                       LOG.info("Obtaining local cache file 
for '" + entry.getKey() + '\'');
+                                       Future<Path> cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
+                                       
distributedCacheEntries.put(entry.getKey(), cp);
                                }
                        }
-                       else {
-                               throw new RuntimeException("unexpected state 
for cancelling: " + current);
+                       catch (Exception e) {
+                               throw new Exception("Exception while adding 
files to distributed cache.", e);
                        }
-               }
-       }
 
-       /**
-        * Sets the tasks to be cancelled and reports a failure back to the 
master.
-        */
-       public void failExternally(Throwable cause) {
-               while (true) {
-                       ExecutionState current = this.executionState;
-
-                       // if the task is already canceled (or canceling) or 
finished or failed,
-                       // then we need not do anything
-                       if (current == ExecutionState.CANCELED || current == 
ExecutionState.CANCELING || current == ExecutionState.FAILED) {
-                               return;
+                       if (isCanceledOrFailed()) {
+                               throw new CancelTaskException();
                        }
 
-                       if (current == ExecutionState.FINISHED) {
-                               // Set state to failed in order to correctly 
unregister task from network environment
-                               if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.FAILED)) {
-                                       notifyObservers(ExecutionState.FAILED, 
null);
+                       // 
----------------------------------------------------------------
+                       //  call the user code initialization methods
+                       // 
----------------------------------------------------------------
 
-                                       return;
+                       TaskInputSplitProvider splitProvider = new 
TaskInputSplitProvider(jobManager,
+                                       jobId, vertexId, executionId, 
userCodeClassLoader, actorAskTimeout);
+
+                       Environment env = new RuntimeEnvironment(jobId, 
vertexId, executionId,
+                                       taskName, taskNameWithSubtask, 
subtaskIndex, parallelism,
+                                       jobConfiguration, taskConfiguration,
+                                       userCodeClassLoader, memoryManager, 
ioManager, broadcastVariableManager,
+                                       splitProvider, distributedCacheEntries,
+                                       writers, inputGates, jobManager);
+
+                       // let the task code create its readers and writers
+                       invokable.setEnvironment(env);
+                       try {
+                               invokable.registerInputOutput();
+                       }
+                       catch (Exception e) {
+                               throw new Exception("Call to 
registerInputOutput() of invokable failed", e);
+                       }
+
+                       // the very last thing before the actual execution 
starts running is to inject
+                       // the state into the task. the state is non-empty if 
this is an execution
+                       // of a task that failed but had backuped state from a 
checkpoint
+                       if (operatorState != null) {
+                               if (invokable instanceof OperatorStateCarrier) {
+                                       ((OperatorStateCarrier) 
invokable).injectState(operatorState);
+                               }
+                               else {
+                                       throw new IllegalStateException("Found 
operator state for a non-stateful task invokable");
                                }
                        }
 
-                       if (current == ExecutionState.DEPLOYING) {
-                               // directly set to canceled
-                               if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.FAILED)) {
-                                       this.failureCause = cause;
+                       // 
----------------------------------------------------------------
+                       //  actual task core work
+                       // 
----------------------------------------------------------------
 
-                                       notifyObservers(ExecutionState.FAILED, 
null);
-                                       unregisterTask();
-                                       return;
+                       // we must make strictly sure that the invokable is 
accessible to teh cancel() call
+                       // by the time we switched to running.
+                       this.invokable = invokable;
+
+                       // switch to the RUNNING state, if that fails, we have 
been canceled/failed in the meantime
+                       if (!STATE_UPDATER.compareAndSet(this, 
ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+                               throw new CancelTaskException();
+                       }
+                       
+                       // notify everyone that we switched to running. 
especially the TaskManager needs
+                       // to know this!
+                       notifyObservers(ExecutionState.RUNNING, null);
+                       taskManager.tell(new 
TaskMessages.UpdateTaskExecutionState(
+                                       new TaskExecutionState(jobId, 
executionId, ExecutionState.RUNNING)), ActorRef.noSender());
+
+                       // make sure the user code classloader is accessible 
thread-locally
+                       
executingThread.setContextClassLoader(userCodeClassLoader);
+
+                       // run the invokable
+                       invokable.invoke();
+
+                       // make sure, we enter the catch block if the task 
leaves the invoke() method due
+                       // to the fact that it has been canceled
+                       if (isCanceledOrFailed()) {
+                               throw new CancelTaskException();
+                       }
+
+                       // 
----------------------------------------------------------------
+                       //  finalization of a successful execution
+                       // 
----------------------------------------------------------------
+
+                       // finish the produced partitions. if this fails, we 
consider the execution failed.
+                       for (ResultPartition partition : producedPartitions) {
+                               if (partition != null) {
+                                       partition.finish();
                                }
                        }
-                       else if (current == ExecutionState.RUNNING) {
-                               // go to canceling and perform the actual task 
canceling
-                               if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.FAILED)) {
-                                       try {
-                                               
this.environment.cancelExecution();
+
+                       // try to mark the task as finished
+                       // if that fails, the task was canceled/failed in the 
meantime
+                       if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+                               notifyObservers(ExecutionState.FINISHED, null);
+                       }
+                       else {
+                               throw new CancelTaskException();
+                       }
+               }
+               catch (Throwable t) {
+
+                       // 
----------------------------------------------------------------
+                       // the execution failed. either the invokable code 
properly failed, or
+                       // an exception was thrown as a side effect of 
cancelling
+                       // 
----------------------------------------------------------------
+
+                       try {
+                               // transition into our final state. we should 
be either in RUNNING, CANCELING, or FAILED 
+                               // loop for multiple retries during concurrent 
state changes via calls to cancel() or
+                               // to failExternally()
+                               while (true) {
+                                       ExecutionState current = 
this.executionState;
+                                       if (current == ExecutionState.RUNNING 
|| current == ExecutionState.DEPLOYING) {
+                                               if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+                                                       // proper failure of 
the task. record the exception as the root cause
+                                                       failureCause = t;
+                                                       
notifyObservers(ExecutionState.FAILED, t);
+
+                                                       // in case of an 
exception during execution, we still call "cancel()" on the task
+                                                       if (invokable != null 
&& this.invokable != null && invokableHasBeenCanceled.compareAndSet(false, 
true)) {
+                                                               try {
+                                                                       
invokable.cancel();
+                                                               }
+                                                               catch 
(Throwable t2) {
+                                                                       
LOG.error("Error while canceling task " + taskNameWithSubtask, t2);
+                                                               }
+                                                       }
+                                                       break;
+                                               }
                                        }
-                                       catch (Throwable e) {
-                                               LOG.error("Error while 
cancelling the task.", e);
+                                       else if (current == 
ExecutionState.CANCELING) {
+                                               if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+                                                       
notifyObservers(ExecutionState.CANCELED, null);
+                                                       break;
+                                               }
                                        }
+                                       else if (current == 
ExecutionState.FAILED) {
+                                               // in state failed already, no 
transition necessary any more
+                                               break;
+                                       }
+                                       // unexpected state, go to failed
+                                       else if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+                                               LOG.error("Unexpected state in 
Task during an exception: " + current);
+                                               break;
+                                       }
+                                       // else fall through the loop and 
+                               }
+                       }
+                       catch (Throwable tt) {
+                               String message = "FATAL - exception in task 
exception handler";
+                               LOG.error(message, tt);
+                               notifyFatalError(message, tt);
+                       }
+               }
+               finally {
+                       try {
+                               LOG.info("Freeing task resources for " + 
taskNameWithSubtask);
+                               
+                               // free the network resources
+                               network.unregisterTask(this);
+
+                               if (invokable != null) {
+                                       memoryManager.releaseAll(invokable);
+                               }
 
-                                       this.failureCause = cause;
+                               // remove all of the tasks library resources
+                               libraryCache.unregisterTask(jobId, executionId);
 
-                                       notifyObservers(ExecutionState.FAILED, 
null);
-                                       unregisterTask();
+                               // remove all files in the distributed cache
+                               removeCachedFiles(distributedCacheEntries, 
fileCache);
 
-                                       return;
-                               }
+                               notifyFinalState();
                        }
-                       else {
-                               throw new RuntimeException("unexpected state 
for failing the task: " + current);
+                       catch (Throwable t) {
+                               // an error in the resource cleanup is fatal
+                               String message = "FATAL - exception in task 
resource cleanup";
+                               LOG.error(message, t);
+                               notifyFatalError(message, t);
                        }
                }
        }
 
-       public void cancelingDone() {
-               while (true) {
-                       ExecutionState current = this.executionState;
+       private ClassLoader createUserCodeClassloader(LibraryCacheManager 
libraryCache) throws Exception {
+               long startDownloadTime = System.currentTimeMillis();
 
-                       if (current == ExecutionState.CANCELED || current == 
ExecutionState.FAILED) {
-                               return;
-                       }
-                       if (!(current == ExecutionState.RUNNING || current == 
ExecutionState.CANCELING)) {
-                               LOG.error(String.format("Unexpected state 
transition in Task: %s -> %s", current, ExecutionState.CANCELED));
-                       }
+               // triggers the download of all missing jar files from the job 
manager
+               libraryCache.registerTask(jobId, executionId, requiredJarFiles);
 
-                       if (STATE_UPDATER.compareAndSet(this, current, 
ExecutionState.CANCELED)) {
-                               notifyObservers(ExecutionState.CANCELED, null);
-                               unregisterTask();
-                               return;
-                       }
+               LOG.debug("Register task {} at library cache manager took {} 
milliseconds",
+                               executionId, System.currentTimeMillis() - 
startDownloadTime);
+
+               ClassLoader userCodeClassLoader = 
libraryCache.getClassLoader(jobId);
+               if (userCodeClassLoader == null) {
+                       throw new Exception("No user code classloader 
available.");
                }
+               return userCodeClassLoader;
        }
 
-       /**
-        * Starts the execution of this task.
-        */
-       public boolean startExecution() {
-               LOG.info("Starting execution of task {}", this.getTaskName());
-               if (STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
-                       final Thread thread = 
this.environment.getExecutingThread();
-                       thread.start();
-                       return true;
+       private AbstractInvokable loadAndInstantiateInvokable(ClassLoader 
classLoader, String className) throws Exception {
+               Class<? extends AbstractInvokable> invokableClass;
+               try {
+                       invokableClass = Class.forName(className, true, 
classLoader)
+                                       .asSubclass(AbstractInvokable.class);
                }
-               else {
-                       return false;
+               catch (Throwable t) {
+                       throw new Exception("Could not load the task's 
invokable class.", t);
+               }
+               try {
+                       return invokableClass.newInstance();
+               }
+               catch (Throwable t) {
+                       throw new Exception("Could not instantiate the task's 
invokable class.", t);
                }
        }
 
-       /**
-        * Unregisters the task from the central memory manager.
-        */
-       public void unregisterMemoryManager(MemoryManager memoryManager) {
-               RuntimeEnvironment env = this.environment;
-               if (memoryManager != null && env != null) {
-                       memoryManager.releaseAll(env.getInvokable());
+       private void removeCachedFiles(Map<String, Future<Path>> entries, 
FileCache fileCache) {
+               // cancel and release all distributed cache files
+               try {
+                       for (Map.Entry<String, Future<Path>> entry : 
entries.entrySet()) {
+                               String name = entry.getKey();
+                               try {
+                                       fileCache.deleteTmpFile(name, jobId);
+                               }
+                               catch (Exception e) {
+                                       // unpleasant, but we continue
+                                       LOG.error("Distributed Cache could not 
remove cached file registered under '"
+                                                       + name + "'.", e);
+                               }
+                       }
+               }
+               catch (Throwable t) {
+                       LOG.error("Error while removing cached local files from 
distributed cache.");
                }
        }
 
-       protected void unregisterTask() {
-               taskManager.tell(new UnregisterTask(executionId), 
ActorRef.noSender());
+       private void notifyFinalState() {
+               taskManager.tell(new TaskInFinalState(executionId), 
ActorRef.noSender());
        }
 
-       // 
-----------------------------------------------------------------------------------------------------------------
-       //                                        Task Profiling
-       // 
-----------------------------------------------------------------------------------------------------------------
+       private void notifyFatalError(String message, Throwable cause) {
+               taskManager.tell(new FatalError(message, cause), 
ActorRef.noSender());
+       }
 
-       /**
-        * Registers the task manager profiler with the task.
-        */
-       public void registerProfiler(TaskManagerProfiler taskManagerProfiler, 
Configuration jobConfiguration) {
-               taskManagerProfiler.registerTask(this, jobConfiguration);
+       // 
----------------------------------------------------------------------------------------------------------------
+       //  Canceling / Failing the task from the outside
+       // 
----------------------------------------------------------------------------------------------------------------
+
+       public void cancelExecution() {
+               LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+               if (cancelOrFailAndCancelInvokable(ExecutionState.CANCELING)) {
+                       notifyObservers(ExecutionState.CANCELING, null);
+               }
        }
 
        /**
-        * Unregisters the task from the task manager profiler.
+        * Sets the tasks to be cancelled and reports a failure back to the 
master.
         */
-       public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) 
{
-               if (taskManagerProfiler != null) {
-                       taskManagerProfiler.unregisterTask(this.executionId);
+       public void failExternally(Throwable cause) {
+               LOG.info("Attempting to fail task externally " + 
taskNameWithSubtask);
+               if (cancelOrFailAndCancelInvokable(ExecutionState.FAILED)) {
+                       failureCause = cause;
+                       notifyObservers(ExecutionState.FAILED, cause);
                }
        }
 
-       // 
------------------------------------------------------------------------
-       // Intermediate result partitions
-       // 
------------------------------------------------------------------------
-
-       public SingleInputGate[] getInputGates() {
-               return environment != null ? environment.getAllInputGates() : 
null;
-       }
+       private boolean cancelOrFailAndCancelInvokable(ExecutionState 
targetState) {
+               while (true) {
+                       ExecutionState current = this.executionState;
 
-       public ResultPartitionWriter[] getWriters() {
-               return environment != null ? environment.getAllWriters() : null;
-       }
+                       // if the task is already canceled (or canceling) or 
finished or failed,
+                       // then we need not do anything
+                       if (current.isTerminal() || current == 
ExecutionState.CANCELING) {
+                               return false;
+                       }
 
-       public ResultPartition[] getProducedPartitions() {
-               return environment != null ? 
environment.getProducedPartitions() : null;
+                       if (current == ExecutionState.DEPLOYING || current == 
ExecutionState.CREATED) {
+                               if (STATE_UPDATER.compareAndSet(this, current, 
targetState)) {
+                                       // if we manage this state transition, 
then the invokable gets never called
+                                       // we need not call cancel on it
+                                       return true;
+                               }
+                       }
+                       else if (current == ExecutionState.RUNNING) {
+                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.RUNNING, targetState)) {
+                                       // we are canceling / failing out of 
the running state
+                                       // we need to cancel the invokable
+                                       if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
+                                               LOG.info("Triggering 
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
+
+                                               // because the canceling may 
block on user code, we cancel from a separate thread
+                                               Runnable canceler = new 
TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
+                                               Thread cancelThread = new 
Thread(executingThread.getThreadGroup(), canceler,
+                                                               "Canceler for " 
+ taskNameWithSubtask);
+                                               cancelThread.start();
+                                       }
+                                       return true;
+                               }
+                       }
+                       else {
+                               throw new IllegalStateException("Unexpected 
task state: " + current);
+                       }
+               }
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       //                                     State Listeners
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  State Listeners
+       // 
------------------------------------------------------------------------
 
        public void registerExecutionListener(ActorRef listener) {
                executionListenerActors.add(listener);
@@ -404,25 +777,146 @@ public class Task {
                executionListenerActors.remove(listener);
        }
 
-       private void notifyObservers(ExecutionState newState, String message) {
-               if (LOG.isInfoEnabled()) {
-                       LOG.info(getTaskNameWithSubtasks() + " switched to " + 
newState + (message == null ? "" : " : " + message));
+       private void notifyObservers(ExecutionState newState, Throwable error) {
+               if (error == null) {
+                       LOG.info(taskNameWithSubtask + " switched to " + 
newState);
+               }
+               else {
+                       LOG.info(taskNameWithSubtask + " switched to " + 
newState + " with exception.", error);
                }
 
+               TaskExecutionState stateUpdate = new TaskExecutionState(jobId, 
executionId, newState, error);
+               TaskMessages.UpdateTaskExecutionState actorMessage = new
+                               
TaskMessages.UpdateTaskExecutionState(stateUpdate);
+
                for (ActorRef listener : executionListenerActors) {
-                       listener.tell(new 
ExecutionGraphMessages.ExecutionStateChanged(
-                                                       jobId, vertexId, 
taskName, numberOfSubtasks, subtaskIndex,
-                                                       executionId, newState, 
System.currentTimeMillis(), message),
-                                       ActorRef.noSender());
+                       listener.tell(actorMessage, ActorRef.noSender());
                }
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       //                                       Utilities
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  Notifications on the invokable
+       // 
------------------------------------------------------------------------
+
+       public void triggerCheckpointBarrier(final long checkpointID) {
+               AbstractInvokable invokabe = this.invokable;
+               
+               if (executionState == ExecutionState.RUNNING && invokabe != 
null) {
+                       if (invokabe instanceof BarrierTransceiver) {
+                               final BarrierTransceiver barrierTransceiver = 
(BarrierTransceiver) invokabe;
+                               final Logger logger = LOG;
+                               
+                               Thread caller = new Thread("Barrier emitter") {
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       
barrierTransceiver.broadcastBarrierFromSource(checkpointID);
+                                               }
+                                               catch (Throwable t) {
+                                                       logger.error("Error 
while triggering checkpoint barriers", t);
+                                               }
+                                       }
+                               };
+                               caller.setDaemon(true);
+                               caller.start();
+                       }
+                       else {
+                               LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - "
+                                               + taskNameWithSubtask);
+                       }
+               }
+               else {
+                       LOG.debug("Ignoring request to trigger a checkpoint 
barrier");
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
 
        @Override
        public String toString() {
                return getTaskNameWithSubtasks() + " [" + executionState + ']';
        }
+       
+       // 
------------------------------------------------------------------------
+       //  Task Names
+       // 
------------------------------------------------------------------------
+
+       public static String getTaskNameWithSubtask(String name, int subtask, 
int numSubtasks) {
+               return name + " (" + (subtask+1) + '/' + numSubtasks + ')';
+       }
+
+       public static String getTaskNameWithSubtaskAndID(String name, int 
subtask, int numSubtasks, ExecutionAttemptID id) {
+               return name + " (" + (subtask+1) + '/' + numSubtasks + ") (" + 
id + ')';
+       }
+
+       /**
+        * This runner calls cancel() on the invokable and periodically 
interrupts the
+        * thread until it has terminated.
+        */
+       private static class TaskCanceler implements Runnable {
+
+               private final Logger logger;
+               private final AbstractInvokable invokable;
+               private final Thread executer;
+               private final String taskName;
+
+               public TaskCanceler(Logger logger, AbstractInvokable invokable, 
Thread executer, String taskName) {
+                       this.logger = logger;
+                       this.invokable = invokable;
+                       this.executer = executer;
+                       this.taskName = taskName;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               // the user-defined cancel method may throw 
errors.
+                               // we need do continue despite that
+                               try {
+                                       invokable.cancel();
+                               }
+                               catch (Throwable t) {
+                                       logger.error("Error while canceling the 
task", t);
+                               }
+
+                               // interrupt the running thread initially 
+                               executer.interrupt();
+                               try {
+                                       executer.join(10000);
+                               }
+                               catch (InterruptedException e) {
+                                       // we can ignore this
+                               }
+
+                               // it is possible that the user code does not 
react immediately. for that
+                               // reason, we spawn a separate thread that 
repeatedly interrupts the user code until
+                               // it exits
+                               while (executer.isAlive()) {
+
+                                       // build the stack trace of where the 
thread is stuck, for the log
+                                       StringBuilder bld = new StringBuilder();
+                                       StackTraceElement[] stack = 
executer.getStackTrace();
+                                       for (StackTraceElement e : stack) {
+                                               bld.append(e).append('\n');
+                                       }
+
+                                       logger.warn("Task '{}' did not react to 
cancelling signal, but is stuck in method:\n {}",
+                                                       taskName, 
bld.toString());
+
+                                       executer.interrupt();
+                                       try {
+                                               executer.join(5000);
+                                       }
+                                       catch (InterruptedException e) {
+                                               // we can ignore this
+                                       }
+                               }
+                       }
+                       catch (Throwable t) {
+                               logger.error("Error in the task canceler", t);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index c81830c..b12f1b5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -24,7 +24,15 @@ import org.apache.flink.runtime.instance.InstanceID
  * Miscellaneous actor messages exchanged with the TaskManager.
  */
 object TaskManagerMessages {
-
+  
+  /**
+   * This message informs the TaskManager about a fatal error that prevents
+   * it from continuing.
+   * 
+   * @param description The description of the problem
+   */
+  case class FatalError(description: String, cause: Throwable)
+  
   /**
    * Tells the task manager to send a heartbeat message to the job manager.
    */
@@ -49,7 +57,7 @@ object TaskManagerMessages {
 
 
   // --------------------------------------------------------------------------
-  //  Utility messages used for notifications during TaskManager startup
+  //  Reporting the current TaskManager stack trace
   // --------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8e613014/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
index c8c5726..b1a08ca 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
@@ -67,12 +67,12 @@ object TaskMessages {
     extends TaskMessage
 
   /**
-   * Unregister the task identified by [[executionID]] from the TaskManager.
-   * Sent to the TaskManager by futures and callbacks.
+   * Notifies the TaskManager that the task has reached its final state,
+   * either FINISHED, CANCELED, or FAILED.
    *
    * @param executionID The task's execution attempt ID.
    */
-  case class UnregisterTask(executionID: ExecutionAttemptID)
+  case class TaskInFinalState(executionID: ExecutionAttemptID)
     extends TaskMessage
 
 

Reply via email to