Repository: flink
Updated Branches:
  refs/heads/release-1.1 33df945fe -> b046038ae


[FLINK-5278] Improve task and checkpoint related logging

This closes #2690.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b046038a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b046038a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b046038a

Branch: refs/heads/release-1.1
Commit: b046038ae11f7662b6d788c1f005a9a61a45393b
Parents: 33df945
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Dec 7 16:22:23 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Wed Dec 7 18:07:55 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  23 ++-
 .../ZooKeeperCompletedCheckpointStore.java      | 107 ++++++++++--
 .../flink/runtime/executiongraph/Execution.java |   9 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 168 ++++++++++++-------
 .../src/main/resources/log4j.properties         |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  19 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  36 ++++
 .../runtime/taskmanager/TaskManagerTest.java    |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |  42 +++--
 9 files changed, 302 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index a3e511f..e5675c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -675,6 +675,7 @@ public class CheckpointCoordinator {
                if (shutdown || message == null) {
                        return false;
                }
+
                if (!job.equals(message.getJob())) {
                        LOG.error("Received wrong AcknowledgeCheckpoint message 
for job {}: {}", job, message);
                        return false;
@@ -710,7 +711,7 @@ public class CheckpointCoordinator {
                                                                "the state 
handle to avoid lingering state.", message.getCheckpointId(),
                                                        
message.getTaskExecutionId(), message.getJob());
 
-                                               
discardState(message.getState());
+                                               discardState(message.getJob(), 
message.getTaskExecutionId(), checkpointId, message.getState());
                                                break;
                                        case DISCARDED:
                                                LOG.warn("Could not acknowledge 
the checkpoint {} for task {} of job {}, " +
@@ -718,7 +719,7 @@ public class CheckpointCoordinator {
                                                                "state handle 
tp avoid lingering state.",
                                                        
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-                                               
discardState(message.getState());
+                                               discardState(message.getJob(), 
message.getTaskExecutionId(), checkpointId, message.getState());
                                }
 
                                return true;
@@ -734,13 +735,15 @@ public class CheckpointCoordinator {
                                // message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
                                if 
(recentPendingCheckpoints.contains(checkpointId)) {
                                        wasPendingCheckpoint = true;
-                                       LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
+                                       LOG.warn("Received late message for now 
expired checkpoint attempt {} from " +
+                                               "task {} and job {}.", 
checkpointId, message.getTaskExecutionId(), message.getJob());
 
                                        // try to discard the state so that we 
don't have lingering state lying around
-                                       discardState(message.getState());
+                                       discardState(message.getJob(), 
message.getTaskExecutionId(), checkpointId, message.getState());
                                }
                                else {
-                                       LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
+                                       LOG.debug("Received message for an 
unknown checkpoint {} from task {} and job" +
+                                               " {}.", checkpointId, 
message.getTaskExecutionId(), message.getState());
                                        wasPendingCheckpoint = false;
                                }
 
@@ -1112,7 +1115,11 @@ public class CheckpointCoordinator {
                }
        }
 
-       private void discardState(final SerializedValue<StateHandle<?>> 
stateObject) {
+       private void discardState(
+                       final JobID jobId,
+                       final ExecutionAttemptID executionAttemptID,
+                       final long checkpointId,
+                       final SerializedValue<StateHandle<?>> stateObject) {
                if (stateObject != null) {
                        executor.execute(new Runnable() {
                                @Override
@@ -1120,7 +1127,9 @@ public class CheckpointCoordinator {
                                        try {
                                                
stateObject.deserializeValue(userClassLoader).discardState();
                                        } catch (Exception e) {
-                                               LOG.warn("Could not properly 
discard state object.", e);
+                                               LOG.warn("Could not properly 
discard state object for checkpoint {} " +
+                                                       "belonging to task {} 
of job {}.", checkpointId,
+                                                       executionAttemptID, 
jobId, e);
                                        }
                                }
                        });

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 6570d00..9ae6c30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -167,7 +168,17 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        Tuple2<StateHandle<CompletedCheckpoint>, String> latest 
= initialCheckpoints
                                        .get(numberOfInitialCheckpoints - 1);
 
-                       CompletedCheckpoint latestCheckpoint = 
latest.f0.getState(userClassLoader);
+                       CompletedCheckpoint latestCheckpoint;
+                       long checkpointId = pathToCheckpointId(latest.f1);
+
+                       LOG.info("Trying to retrieve checkpoint {}.", 
checkpointId);
+
+                       try {
+                               latestCheckpoint = 
latest.f0.getState(userClassLoader);
+                       } catch (Exception e) {
+                               throw new Exception("Could not retrieve the 
completed checkpoint " + checkpointId +
+                               " from the state storage.", e);
+                       }
 
                        checkpointStateHandles.add(latest);
 
@@ -194,7 +205,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                checkNotNull(checkpoint, "Checkpoint");
 
                // First add the new one. If it fails, we don't want to loose 
existing data.
-               String path = String.format("/%s", 
checkpoint.getCheckpointID());
+               String path = checkpointIdToPath(checkpoint.getCheckpointID());
 
                final StateHandle<CompletedCheckpoint> stateHandle = 
checkpointsInZooKeeper.add(path, checkpoint);
 
@@ -266,26 +277,57 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        /**
         * Removes the state handle from ZooKeeper, discards the checkpoints, 
and the state handle.
         */
-       private void removeFromZooKeeperAndDiscardCheckpoint(
-                       final Tuple2<StateHandle<CompletedCheckpoint>, String> 
stateHandleAndPath) throws Exception {
+       private void removeFromZooKeeperAndDiscardCheckpoint(final 
Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws 
Exception {
 
                final BackgroundCallback callback = new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
+                               final long checkpointId = 
pathToCheckpointId(stateHandleAndPath.f1);
+
                                try {
                                        if (event.getType() == 
CuratorEventType.DELETE) {
                                                if (event.getResultCode() == 0) 
{
-                                                       // The checkpoint
-                                                       CompletedCheckpoint 
checkpoint = stateHandleAndPath
-                                                                       
.f0.getState(userClassLoader);
+                                                       Exception exception = 
null;
 
-                                                       
checkpoint.discard(userClassLoader);
-
-                                                       // Discard the state 
handle
-                                                       
stateHandleAndPath.f0.discardState();
-
-                                                       // Discard the 
checkpoint
-                                                       LOG.debug("Discarded " 
+ checkpoint);
+                                                       // The checkpoint
+                                                       CompletedCheckpoint 
checkpoint = null;
+
+                                                       try {
+                                                               checkpoint = 
stateHandleAndPath.f0.getState(userClassLoader);
+                                                       } catch (Exception e) {
+                                                               Exception 
newException = new Exception("Could not retrieve the completed checkpoint " +
+                                                                       
checkpointId + " from the state storage.", e);
+
+                                                               exception = 
ExceptionUtils.firstOrSuppressed(newException, exception);
+                                                       }
+
+                                                       if (checkpoint != null) 
{
+                                                               try {
+                                                                       
checkpoint.discard(userClassLoader);
+                                                               } catch 
(Exception e) {
+                                                                       
Exception newException = new Exception("Could not discard the completed 
checkpoint " +
+                                                                               
checkpoint + '.', e);
+
+                                                                       
exception = ExceptionUtils.firstOrSuppressed(newException, exception);
+                                                               }
+                                                       }
+
+                                                       try {
+                                                               // Discard the 
state handle
+                                                               
stateHandleAndPath.f0.discardState();
+                                                       } catch (Exception e) {
+                                                               Exception 
newException = new Exception("Could not discard meta data of completed 
checkpoint " +
+                                                                       
checkpointId + '.', e);
+
+                                                               exception = 
ExceptionUtils.firstOrSuppressed(newException, exception);
+                                                       }
+
+                                                       if (exception != null) {
+                                                               throw exception;
+                                                       } else {
+                                                               // Discard the 
checkpoint
+                                                               
LOG.debug("Discarded {}.", checkpoint);
+                                                       }
                                                }
                                                else {
                                                        throw new 
IllegalStateException("Unexpected result code " +
@@ -298,7 +340,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                                        }
                                }
                                catch (Exception e) {
-                                       LOG.error("Failed to discard 
checkpoint.", e);
+                                       LOG.warn("Failed to discard checkpoint 
{}.", checkpointId, e);
                                }
                        }
                };
@@ -308,4 +350,39 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                // inconsistent state.
                checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback);
        }
+
+       /**
+        * Convert a checkpoint id into a ZooKeeper path.
+        *
+        * @param checkpointId to convert to the path
+        * @return Path created from the given checkpoint id
+        */
+       protected static String checkpointIdToPath(long checkpointId) {
+               return String.format("/%s", checkpointId);
+       }
+
+       /**
+        * Converts a path to the checkpoint id.
+        *
+        * @param path in ZooKeeper
+        * @return Checkpoint id parsed from the path
+        */
+       protected static long pathToCheckpointId(String path) {
+               try {
+                       String numberString;
+
+                       // check if we have a leading slash
+                       if ('/' == path.charAt(0) ) {
+                               numberString = path.substring(1);
+                       } else {
+                               numberString = path;
+                       }
+                       return Long.parseLong(numberString);
+               } catch (NumberFormatException e) {
+                       LOG.warn("Could not parse checkpoint id from {}. This 
indicates that the " +
+                               "checkpoint id to path conversion has 
changed.", path);
+
+                       return -1L;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 84b679b..b336e54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -979,14 +979,17 @@ public class Execution implements Serializable {
        private boolean transitionState(ExecutionState currentState, 
ExecutionState targetState, Throwable error) {
                // sanity check
                if (currentState.isTerminal()) {
-                       throw new IllegalStateException("Cannot leave terminal 
state " + currentState + " to transition to " + targetState + ".");
+                       throw new IllegalStateException("Cannot leave terminal 
state " + currentState + " to transition to " + targetState + '.');
                }
 
                if (STATE_UPDATER.compareAndSet(this, currentState, 
targetState)) {
                        markTimestamp(targetState);
 
-                       LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " 
("  + getAttemptId() + ") switched from "
-                               + currentState + " to " + targetState);
+                       if (error == null) {
+                               LOG.info("{} ({}) switched from {} to {}.", 
getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, 
targetState);
+                       } else {
+                               LOG.info("{} ({}) switched from {} to {}.", 
getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, 
targetState, error);
+                       }
 
                        // make sure that the state transition completes 
normally.
                        // potential errors (in listeners may not affect the 
main logic)

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e4e1b36..8a446d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -480,7 +480,7 @@ public class Task implements Runnable {
                while (true) {
                        ExecutionState current = this.executionState;
                        if (current == ExecutionState.CREATED) {
-                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
+                               if (transitionState(ExecutionState.CREATED, 
ExecutionState.DEPLOYING)) {
                                        // success, we can start our work
                                        break;
                                }
@@ -491,14 +491,14 @@ public class Task implements Runnable {
                                return;
                        }
                        else if (current == ExecutionState.CANCELING) {
-                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.CANCELING, ExecutionState.CANCELED)) {
+                               if (transitionState(ExecutionState.CANCELING, 
ExecutionState.CANCELED)) {
                                        // we were immediately canceled. tell 
the TaskManager that we reached our final state
                                        notifyFinalState();
                                        return;
                                }
                        }
                        else {
-                               throw new IllegalStateException("Invalid state 
for beginning of task operation");
+                               throw new IllegalStateException("Invalid state 
for beginning of operation of task " + this + '.');
                        }
                }
 
@@ -516,7 +516,7 @@ public class Task implements Runnable {
 
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
-                       LOG.info("Loading JAR files for task " + 
taskNameWithSubtask);
+                       LOG.info("Loading JAR files for task {}.", this);
 
                        userCodeClassLoader = 
createUserCodeClassloader(libraryCache);
                        final ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
@@ -545,7 +545,7 @@ public class Task implements Runnable {
                        // the registration must also strictly be undone
                        // 
----------------------------------------------------------------
 
-                       LOG.info("Registering task at network: " + this);
+                       LOG.info("Registering task at network: {}.", this);
                        network.registerTask(this);
 
                        // next, kick off the background copying of files for 
the distributed cache
@@ -553,13 +553,15 @@ public class Task implements Runnable {
                                for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> entry :
                                                
DistributedCache.readFileInfoFromConfig(jobConfiguration))
                                {
-                                       LOG.info("Obtaining local cache file 
for '" + entry.getKey() + '\'');
+                                       LOG.info("Obtaining local cache file 
for '{}'.", entry.getKey());
                                        Future<Path> cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
                                        
distributedCacheEntries.put(entry.getKey(), cp);
                                }
                        }
                        catch (Exception e) {
-                               throw new Exception("Exception while adding 
files to distributed cache.", e);
+                               throw new Exception(
+                                       String.format("Exception while adding 
files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
+                                       e);
                        }
 
                        if (isCanceledOrFailed()) {
@@ -598,11 +600,13 @@ public class Task implements Runnable {
                                                StateUtils.setOperatorState(op, 
state);
                                        }
                                        catch (Exception e) {
-                                               throw new 
RuntimeException("Failed to deserialize state handle and setup initial operator 
state.", e);
+                                               throw new RuntimeException(
+                                                       String.format("Failed 
to deserialize state handle and setup initial operator state for task %s 
(%s).", taskNameWithSubtask, executionId),
+                                                       e);
                                        }
                                }
                                else {
-                                       throw new IllegalStateException("Found 
operator state for a non-stateful task invokable");
+                                       throw new 
IllegalStateException(String.format("Found operator state for a non-stateful 
task %s (%s)", taskNameWithSubtask, executionId));
                                }
                        }
 
@@ -621,7 +625,7 @@ public class Task implements Runnable {
                        this.invokable = invokable;
 
                        // switch to the RUNNING state, if that fails, we have 
been canceled/failed in the meantime
-                       if (!STATE_UPDATER.compareAndSet(this, 
ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
+                       if (!transitionState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
                                throw new CancelTaskException();
                        }
 
@@ -656,7 +660,7 @@ public class Task implements Runnable {
 
                        // try to mark the task as finished
                        // if that fails, the task was canceled/failed in the 
meantime
-                       if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.RUNNING, ExecutionState.FINISHED)) {
+                       if (transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
                                notifyObservers(ExecutionState.FINISHED, null);
                        }
                        else {
@@ -679,7 +683,7 @@ public class Task implements Runnable {
 
                                        if (current == ExecutionState.RUNNING 
|| current == ExecutionState.DEPLOYING) {
                                                if (t instanceof 
CancelTaskException) {
-                                                       if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+                                                       if 
(transitionState(current, ExecutionState.CANCELED)) {
                                                                
cancelInvokable();
 
                                                                
notifyObservers(ExecutionState.CANCELED, null);
@@ -687,19 +691,19 @@ public class Task implements Runnable {
                                                        }
                                                }
                                                else {
-                                                       if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+                                                       if 
(transitionState(current, ExecutionState.FAILED, t)) {
                                                                // proper 
failure of the task. record the exception as the root cause
-                                                               LOG.error("Task 
execution failed. ", t);
+                                                               String 
errorMessage = String.format("Execution of {} ({}) failed.", 
taskNameWithSubtask, executionId);
                                                                failureCause = 
t;
                                                                
cancelInvokable();
 
-                                                               
notifyObservers(ExecutionState.FAILED, t);
+                                                               
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                                                                break;
                                                        }
                                                }
                                        }
                                        else if (current == 
ExecutionState.CANCELING) {
-                                               if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+                                               if (transitionState(current, 
ExecutionState.CANCELED)) {
                                                        
notifyObservers(ExecutionState.CANCELED, null);
                                                        break;
                                                }
@@ -709,22 +713,22 @@ public class Task implements Runnable {
                                                break;
                                        }
                                        // unexpected state, go to failed
-                                       else if 
(STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
-                                               LOG.error("Unexpected state in 
Task during an exception: " + current);
+                                       else if (transitionState(current, 
ExecutionState.FAILED, t)) {
+                                               LOG.error("Unexpected state in 
task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, 
current);
                                                break;
                                        }
                                        // else fall through the loop and
                                }
                        }
                        catch (Throwable tt) {
-                               String message = "FATAL - exception in task 
exception handler";
+                               String message = String.format("FATAL - 
exception in exception handler of task %s (%s).", taskNameWithSubtask, 
executionId);
                                LOG.error(message, tt);
                                notifyFatalError(message, tt);
                        }
                }
                finally {
                        try {
-                               LOG.info("Freeing task resources for " + 
taskNameWithSubtask);
+                               LOG.info("Freeing task resources for {} ({}).", 
taskNameWithSubtask, executionId);
 
                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
@@ -751,7 +755,7 @@ public class Task implements Runnable {
                        }
                        catch (Throwable t) {
                                // an error in the resource cleanup is fatal
-                               String message = "FATAL - exception in task 
resource cleanup";
+                               String message = String.format("FATAL - 
exception in resource cleanup of task %s (%s).", taskNameWithSubtask, 
executionId);
                                LOG.error(message, t);
                                notifyFatalError(message, t);
                        }
@@ -763,7 +767,7 @@ public class Task implements Runnable {
                                metrics.close();
                        }
                        catch (Throwable t) {
-                               LOG.error("Error during metrics 
de-registration", t);
+                               LOG.error("Error during metrics de-registration 
of task {} ({}).", taskNameWithSubtask, executionId, t);
                        }
                }
        }
@@ -829,6 +833,39 @@ public class Task implements Runnable {
                taskManager.tell(new FatalError(message, cause));
        }
 
+       /**
+        * Try to transition the execution state from the current state to the 
new state.
+        *
+        * @param currentState of the execution
+        * @param newState of the execution
+        * @return true if the transition was successful, otherwise false
+        */
+       private boolean transitionState(ExecutionState currentState, 
ExecutionState newState) {
+               return transitionState(currentState, newState, null);
+       }
+
+       /**
+        * Try to transition the execution state from the current state to the 
new state.
+        *
+        * @param currentState of the execution
+        * @param newState of the execution
+        * @param cause of the transition change or null
+        * @return true if the transition was successful, otherwise false
+        */
+       private boolean transitionState(ExecutionState currentState, 
ExecutionState newState, Throwable cause) {
+               if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
+                       if (cause == null) {
+                               LOG.info("{} ({}) switched from {} to {}.", 
taskNameWithSubtask, executionId, currentState, newState);
+                       } else {
+                               LOG.info("{} ({}) switched from {} to {}.", 
taskNameWithSubtask, executionId, currentState, newState, cause);
+                       }
+
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
        // 
----------------------------------------------------------------------------------------------------------------
        //  Stopping / Canceling / Failing the task from the outside
        // 
----------------------------------------------------------------------------------------------------------------
@@ -843,22 +880,22 @@ public class Task implements Runnable {
         *             if the {@link AbstractInvokable} does not implement 
{@link StoppableTask}
         */
        public void stopExecution() throws UnsupportedOperationException {
-               LOG.info("Attempting to stop task " + taskNameWithSubtask);
-               if(this.invokable instanceof StoppableTask) {
+               LOG.info("Attempting to stop task {} ({}).", 
taskNameWithSubtask, executionId);
+               if (invokable instanceof StoppableTask) {
                        Runnable runnable = new Runnable() {
                                @Override
                                public void run() {
                                        try {
-                                               
((StoppableTask)Task.this.invokable).stop();
+                                               
((StoppableTask)invokable).stop();
                                        } catch(RuntimeException e) {
-                                               LOG.error("Stopping task " + 
taskNameWithSubtask + " failed.", e);
+                                               LOG.error("Stopping task {} 
({}) failed.", taskNameWithSubtask, executionId, e);
                                                taskManager.tell(new 
FailTask(executionId, e));
                                        }
                                }
                        };
-                       executeAsyncCallRunnable(runnable, "Stopping source 
task " + this.taskNameWithSubtask);
+                       executeAsyncCallRunnable(runnable, 
String.format("Stopping source task %s (%s).", taskNameWithSubtask, 
executionId));
                } else {
-                       throw new UnsupportedOperationException("Stopping not 
supported by this task.");
+                       throw new 
UnsupportedOperationException(String.format("Stopping not supported by task %s 
(%s).", taskNameWithSubtask, executionId));
                }
        }
 
@@ -871,7 +908,7 @@ public class Task implements Runnable {
         * <p>This method never blocks.</p>
         */
        public void cancelExecution() {
-               LOG.info("Attempting to cancel task " + taskNameWithSubtask);
+               LOG.info("Attempting to cancel task {} ({}).", 
taskNameWithSubtask, executionId);
                cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
        }
 
@@ -885,37 +922,52 @@ public class Task implements Runnable {
         * <p>This method never blocks.</p>
         */
        public void failExternally(Throwable cause) {
-               LOG.info("Attempting to fail task externally " + 
taskNameWithSubtask);
+               LOG.info("Attempting to fail task externally {} ({}).", 
taskNameWithSubtask, executionId);
                cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
        }
 
        private void cancelOrFailAndCancelInvokable(ExecutionState targetState, 
Throwable cause) {
                while (true) {
-                       ExecutionState current = this.executionState;
+                       ExecutionState current = executionState;
 
                        // if the task is already canceled (or canceling) or 
finished or failed,
                        // then we need not do anything
                        if (current.isTerminal() || current == 
ExecutionState.CANCELING) {
-                               LOG.info("Task " + taskNameWithSubtask + " is 
already in state " + current);
+                               LOG.info("Task {} is already in state {}", 
taskNameWithSubtask, current);
                                return;
                        }
 
                        if (current == ExecutionState.DEPLOYING || current == 
ExecutionState.CREATED) {
-                               if (STATE_UPDATER.compareAndSet(this, current, 
targetState)) {
+                               if (transitionState(current, targetState, 
cause)) {
                                        // if we manage this state transition, 
then the invokable gets never called
                                        // we need not call cancel on it
                                        this.failureCause = cause;
-                                       notifyObservers(targetState, cause);
+                                       notifyObservers(
+                                               targetState,
+                                               new Exception(
+                                                       String.format(
+                                                               "Cancel or fail 
execution of %s (%s).",
+                                                               
taskNameWithSubtask,
+                                                               executionId),
+                                                       cause));
                                        return;
                                }
                        }
                        else if (current == ExecutionState.RUNNING) {
-                               if (STATE_UPDATER.compareAndSet(this, 
ExecutionState.RUNNING, targetState)) {
+                               if (transitionState(ExecutionState.RUNNING, 
targetState, cause)) {
                                        // we are canceling / failing out of 
the running state
                                        // we need to cancel the invokable
                                        if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                                                this.failureCause = cause;
-                                               notifyObservers(targetState, 
cause);
+                                               notifyObservers(
+                                                       targetState,
+                                                       new Exception(
+                                                               String.format(
+                                                                       "Cancel 
or fail execution of %s (%s).",
+                                                                       
taskNameWithSubtask,
+                                                                       
executionId),
+                                                               cause));
+
                                                LOG.info("Triggering 
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
                                                // because the canceling may 
block on user code, we cancel from a separate thread
@@ -934,7 +986,7 @@ public class Task implements Runnable {
                                                                
producedPartitions,
                                                                inputGates);
                                                Thread cancelThread = new 
Thread(executingThread.getThreadGroup(), canceler,
-                                                               "Canceler for " 
+ taskNameWithSubtask);
+                                                               
String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
                                                cancelThread.setDaemon(true);
                                                cancelThread.start();
                                        }
@@ -942,7 +994,8 @@ public class Task implements Runnable {
                                }
                        }
                        else {
-                               throw new IllegalStateException("Unexpected 
task state: " + current);
+                               throw new 
IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
+                                       current, taskNameWithSubtask, 
executionId));
                        }
                }
        }
@@ -956,13 +1009,6 @@ public class Task implements Runnable {
        }
 
        private void notifyObservers(ExecutionState newState, Throwable error) {
-               if (error == null) {
-                       LOG.info(taskNameWithSubtask + " switched to " + 
newState);
-               }
-               else {
-                       LOG.info(taskNameWithSubtask + " switched to " + 
newState + " with exception.", error);
-               }
-
                TaskExecutionState stateUpdate = new TaskExecutionState(jobId, 
executionId, newState, error);
                UpdateTaskExecutionState actorMessage = new 
UpdateTaskExecutionState(stateUpdate);
 
@@ -1009,16 +1055,20 @@ public class Task implements Runnable {
                                                        if (getExecutionState() 
== ExecutionState.RUNNING) {
                                                                
failExternally(new Exception(
                                                                        "Error 
while triggering checkpoint " + checkpointID + " for " +
-                                                                               
taskName, t));
+                                                                               
taskNameWithSubtask, t));
+                                                       } else {
+                                                               
LOG.debug("Encountered error while triggering checkpoint {} for " +
+                                                                       "{} 
({}) while being not in state running.", checkpointID,
+                                                                       
taskNameWithSubtask, executionId, t);
                                                        }
                                                }
                                        }
                                };
-                               executeAsyncCallRunnable(runnable, "Checkpoint 
Trigger for " + taskName);
+                               executeAsyncCallRunnable(runnable, 
String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, 
executionId));
                        }
                        else {
-                               LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - "
-                                               + taskNameWithSubtask);
+                               LOG.error("Task received a checkpoint request, 
but is not a checkpointing task - {} ({}).",
+                                               taskNameWithSubtask, 
executionId);
 
                                DeclineCheckpoint decline = new 
DeclineCheckpoint(
                                                jobId, executionId, 
checkpointID,
@@ -1027,7 +1077,7 @@ public class Task implements Runnable {
                        }
                }
                else {
-                       LOG.debug("Declining checkpoint request for non-running 
task");
+                       LOG.debug("Declining checkpoint request for non-running 
task {} ({}).", taskNameWithSubtask, executionId);
 
                        // send back a message that we did not do the checkpoint
                        DeclineCheckpoint decline = new DeclineCheckpoint(
@@ -1066,12 +1116,12 @@ public class Task implements Runnable {
                                executeAsyncCallRunnable(runnable, "Checkpoint 
Confirmation for " + taskName);
                        }
                        else {
-                               LOG.error("Task received a checkpoint commit 
notification, but is not a checkpoint committing task - "
-                                               + taskNameWithSubtask);
+                               LOG.error("Task received a checkpoint commit 
notification, but is not a checkpoint committing task - {}.",
+                                               taskNameWithSubtask);
                        }
                }
                else {
-                       LOG.debug("Ignoring checkpoint commit notification for 
non-running task.");
+                       LOG.debug("Ignoring checkpoint commit notification for 
non-running task {}.", taskNameWithSubtask);
                }
        }
 
@@ -1174,14 +1224,14 @@ public class Task implements Runnable {
                                invokable.cancel();
                        }
                        catch (Throwable t) {
-                               LOG.error("Error while canceling task " + 
taskNameWithSubtask, t);
+                               LOG.error("Error while canceling task {}.", 
taskNameWithSubtask, t);
                        }
                }
        }
 
        @Override
        public String toString() {
-               return taskNameWithSubtask + " [" + executionState + ']';
+               return String.format("%s (%s) [%s]", taskNameWithSubtask, 
executionId, executionState);
        }
 
        /**
@@ -1257,7 +1307,7 @@ public class Task implements Runnable {
                                try {
                                        invokable.cancel();
                                } catch (Throwable t) {
-                                       logger.error("Error while canceling the 
task", t);
+                                       logger.error("Error while canceling the 
task {}.", taskName, t);
                                }
 
                                // Early release of input and output buffer 
pools. We do this
@@ -1271,7 +1321,7 @@ public class Task implements Runnable {
                                        try {
                                                partition.destroyBufferPool();
                                        } catch (Throwable t) {
-                                               LOG.error("Failed to release 
result partition buffer pool.", t);
+                                               LOG.error("Failed to release 
result partition buffer pool for task {}.", taskName, t);
                                        }
                                }
 
@@ -1279,7 +1329,7 @@ public class Task implements Runnable {
                                        try {
                                                inputGate.releaseAllResources();
                                        } catch (Throwable t) {
-                                               LOG.error("Failed to release 
input gate.", t);
+                                               LOG.error("Failed to release 
input gate for task {}.", taskName, t);
                                        }
                                }
 
@@ -1297,7 +1347,7 @@ public class Task implements Runnable {
                                        watchDogThread.join();
                                }
                        } catch (Throwable t) {
-                               logger.error("Error in the task canceler", t);
+                               logger.error("Error in the task canceler for 
task {}.", taskName, t);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/log4j.properties 
b/flink-runtime/src/main/resources/log4j.properties
index 9912b19..749796f 100644
--- a/flink-runtime/src/main/resources/log4j.properties
+++ b/flink-runtime/src/main/resources/log4j.properties
@@ -18,7 +18,7 @@
 
 
 # Convenience file for local debugging of the JobManager/TaskManager.
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cbf7b5d..2b455b7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -82,7 +82,6 @@ import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 
 /**
@@ -1382,18 +1381,20 @@ class JobManager(
                       case None => getClass.getClassLoader
                     }
 
-                    future {
-                      Option(ackMessage.getState()) match {
-                        case Some(state) =>
+                    Option(ackMessage.getState()) match {
+                      case Some(state) =>
+                        future {
                           try {
                             state.deserializeValue(classLoader).discardState()
                           } catch {
-                            case e: Exception => log.warn("Could not discard 
orphaned checkpoint " +
-                                             "state.", e)
+                            case e: Exception =>
+                              log.warn("Could not discard orphaned checkpoint 
state for " +
+                                         s"$ackMessage.", e)
                           }
-                        case None =>
-                      }
-                    }(ExecutionContext.fromExecutor(ioExecutor))
+                        } (ExecutionContext.fromExecutor(ioExecutor))
+                      case None =>
+                        // no state to discard
+                    }
                   }
                 } catch {
                   case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
new file mode 100644
index 0000000..6ee0141
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
+
+       @Test
+       public void testPathConversion() {
+               final long checkpointId = 42L;
+
+               final String path = 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId);
+
+               assertEquals(checkpointId, 
ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index f2fd859..6a696a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -103,6 +103,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -480,7 +481,9 @@ public class TaskManagerTest extends TestLogger {
                                                                        
"found."));
 
                                                        tm.tell(new 
StopTask(eid2), testActorGateway);
-                                                       expectMsgEquals(new 
TaskOperationResult(eid2, false, "UnsupportedOperationException: Stopping not 
supported by this task."));
+                                                       TaskOperationResult 
message = expectMsgClass(TaskOperationResult.class);
+                                                       assertEquals(eid2, 
message.executionID());
+                                                       
assertFalse(message.success());
 
                                                        
assertEquals(ExecutionState.RUNNING, t2.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d7204a9..aa88175 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -202,7 +202,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                boolean disposed = false;
                try {
                        // -------- Initialize ---------
-                       LOG.debug("Initializing {}", getName());
+                       LOG.debug("Initializing {}.", getName());
 
                        userClassLoader = getUserCodeClassLoader();
                        configuration = new 
StreamConfig(getTaskConfiguration());
@@ -587,8 +587,11 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                catch (Exception e) {
                        // propagate exceptions only if the task is still in 
"running" state
                        if (isRunning) {
-                               throw e;
+                               throw new Exception("Could not perform 
checkpoint " + checkpointId +
+                                       "for operator " + getName() + '.', e);
                        } else {
+                               LOG.debug("Could not perform checkpoint {} for 
operator {} while the " +
+                                       "invokable was not in state running.", 
checkpointId, getName(), e);
                                return false;
                        }
                }
@@ -600,10 +603,12 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        performCheckpoint(checkpointId, timestamp);
                }
                catch (CancelTaskException e) {
-                       throw e;
+                       throw new Exception("Operator " + getName() + " was 
cancelled while performing checkpoint " +
+                               checkpointId + '.');
                }
                catch (Exception e) {
-                       throw new Exception("Error while performing checkpoint 
" + checkpointId + '.', e);
+                       throw new Exception("Could not perform checkpoint " + 
checkpointId + " for operator " +
+                               getName() + '.', e);
                }
        }
 
@@ -651,13 +656,15 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                                                        try {
                                                                                
states[j].discardState();
                                                                        } catch 
(Exception discardException) {
-                                                                               
LOG.warn("Could not discard " + j + "th operator state.", discardException);
+                                                                               
LOG.warn("Could not discard {}th operator state of " +
+                                                                               
        "checkpoint {} for operator {}.", j, checkpointId,
+                                                                               
        getName(), discardException);
                                                                        }
                                                                }
                                                        }
 
-                                                       throw new 
Exception("Could not perform the checkpoint for " + i +
-                                                               "th operator in 
chain.", exception);
+                                                       throw new 
Exception("Could not perform the checkpoint " + checkpointId +
+                                                               " for " + i + 
"th operator in chain.", exception);
                                                }
 
                                                if (state.getOperatorState() 
instanceof AsynchronousStateHandle) {
@@ -768,7 +775,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
                if (stateBackend != null) {
                        // backend has been configured on the environment
-                       LOG.info("Using user-defined state backend: " + 
stateBackend);
+                       LOG.info("Using user-defined state backend: {}.", 
stateBackend);
                } else {
                        // see if we have a backend specified in the 
configuration
                        Configuration flinkConfig = 
getEnvironment().getTaskManagerInfo().getConfiguration();
@@ -787,8 +794,8 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
                                case "filesystem":
                                        FsStateBackend backend = new 
FsStateBackendFactory().createFromConfig(flinkConfig);
-                                       LOG.info("State backend is set to heap 
memory (checkpoints to filesystem \""
-                                               + backend.getBasePath() + 
"\")");
+                                       LOG.info("State backend is set to heap 
memory (checkpoints to filesystem \"{}\")",
+                                               backend.getBasePath());
                                        stateBackend = backend;
                                        break;
 
@@ -945,11 +952,15 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                                                try {
                                                                        
states[j].discardState();
                                                                } catch 
(Exception discardException) {
-                                                                       
LOG.warn("Could not discard the " + j + "th operator state.", discardException);
+                                                                       
LOG.warn("Could not discard the {}th operator state of " +
+                                                                               
"checkpoint {} for operator {}.", j, checkpointId,
+                                                                               
owner.getName(), discardException);
                                                                }
                                                        }
 
-                                                       throw new 
Exception("Could not materialize the " + i + "th operator state.", exception);
+                                                       throw new 
Exception("Could not materialize the " + i + "th operator " +
+                                                               "state of 
operator " + owner.getName() + " for checkpoint " +
+                                                               checkpointId + 
'.', exception);
                                                }
                                        }
                                }
@@ -962,10 +973,13 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        }
                        catch (Exception e) {
                                if (owner.isRunning()) {
-                                       LOG.error("Caught exception while 
materializing asynchronous checkpoints.", e);
+                                       LOG.error("Caught exception while 
materializing asynchronous checkpoint {} for operator {}.", checkpointId, 
owner.getName(), e);
                                }
+
                                if (owner.asyncException == null) {
-                                       owner.asyncException = new 
AsynchronousException(e);
+                                       owner.asyncException = new 
AsynchronousException(
+                                               new Exception("Could not 
materialize checkpoint " + checkpointId +
+                                                       " of operator " + 
getName() + '.', e));
                                }
                        }
                        finally {

Reply via email to