Repository: flink Updated Branches: refs/heads/release-1.1 33df945fe -> b046038ae
[FLINK-5278] Improve task and checkpoint related logging This closes #2690. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b046038a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b046038a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b046038a Branch: refs/heads/release-1.1 Commit: b046038ae11f7662b6d788c1f005a9a61a45393b Parents: 33df945 Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Dec 7 16:22:23 2016 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Wed Dec 7 18:07:55 2016 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 23 ++- .../ZooKeeperCompletedCheckpointStore.java | 107 ++++++++++-- .../flink/runtime/executiongraph/Execution.java | 9 +- .../apache/flink/runtime/taskmanager/Task.java | 168 ++++++++++++------- .../src/main/resources/log4j.properties | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 19 ++- .../ZooKeeperCompletedCheckpointStoreTest.java | 36 ++++ .../runtime/taskmanager/TaskManagerTest.java | 5 +- .../streaming/runtime/tasks/StreamTask.java | 42 +++-- 9 files changed, 302 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index a3e511f..e5675c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -675,6 +675,7 @@ public class CheckpointCoordinator { if (shutdown || message == null) { return false; } + if (!job.equals(message.getJob())) { LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message); return false; @@ -710,7 +711,7 @@ public class CheckpointCoordinator { "the state handle to avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - discardState(message.getState()); + discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState()); break; case DISCARDED: LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " + @@ -718,7 +719,7 @@ public class CheckpointCoordinator { "state handle tp avoid lingering state.", message.getCheckpointId(), message.getTaskExecutionId(), message.getJob()); - discardState(message.getState()); + discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState()); } return true; @@ -734,13 +735,15 @@ public class CheckpointCoordinator { // message is for an unknown checkpoint, or comes too late (checkpoint disposed) if (recentPendingCheckpoints.contains(checkpointId)) { wasPendingCheckpoint = true; - LOG.warn("Received late message for now expired checkpoint attempt {}.", checkpointId); + LOG.warn("Received late message for now expired checkpoint attempt {} from " + + "task {} and job {}.", checkpointId, message.getTaskExecutionId(), message.getJob()); // try to discard the state so that we don't have lingering state lying around - discardState(message.getState()); + discardState(message.getJob(), message.getTaskExecutionId(), checkpointId, message.getState()); } else { - LOG.debug("Received message for an unknown checkpoint {}.", checkpointId); + LOG.debug("Received message for an unknown checkpoint {} from task {} and job" + + " {}.", checkpointId, message.getTaskExecutionId(), message.getState()); wasPendingCheckpoint = false; } @@ -1112,7 +1115,11 @@ public class CheckpointCoordinator { } } - private void discardState(final SerializedValue<StateHandle<?>> stateObject) { + private void discardState( + final JobID jobId, + final ExecutionAttemptID executionAttemptID, + final long checkpointId, + final SerializedValue<StateHandle<?>> stateObject) { if (stateObject != null) { executor.execute(new Runnable() { @Override @@ -1120,7 +1127,9 @@ public class CheckpointCoordinator { try { stateObject.deserializeValue(userClassLoader).discardState(); } catch (Exception e) { - LOG.warn("Could not properly discard state object.", e); + LOG.warn("Could not properly discard state object for checkpoint {} " + + "belonging to task {} of job {}.", checkpointId, + executionAttemptID, jobId, e); } } }); http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 6570d00..9ae6c30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.zookeeper.StateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +168,17 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints .get(numberOfInitialCheckpoints - 1); - CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader); + CompletedCheckpoint latestCheckpoint; + long checkpointId = pathToCheckpointId(latest.f1); + + LOG.info("Trying to retrieve checkpoint {}.", checkpointId); + + try { + latestCheckpoint = latest.f0.getState(userClassLoader); + } catch (Exception e) { + throw new Exception("Could not retrieve the completed checkpoint " + checkpointId + + " from the state storage.", e); + } checkpointStateHandles.add(latest); @@ -194,7 +205,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto checkNotNull(checkpoint, "Checkpoint"); // First add the new one. If it fails, we don't want to loose existing data. - String path = String.format("/%s", checkpoint.getCheckpointID()); + String path = checkpointIdToPath(checkpoint.getCheckpointID()); final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint); @@ -266,26 +277,57 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto /** * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle. */ - private void removeFromZooKeeperAndDiscardCheckpoint( - final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception { + private void removeFromZooKeeperAndDiscardCheckpoint(final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception { final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1); + try { if (event.getType() == CuratorEventType.DELETE) { if (event.getResultCode() == 0) { - // The checkpoint - CompletedCheckpoint checkpoint = stateHandleAndPath - .f0.getState(userClassLoader); + Exception exception = null; - checkpoint.discard(userClassLoader); - - // Discard the state handle - stateHandleAndPath.f0.discardState(); - - // Discard the checkpoint - LOG.debug("Discarded " + checkpoint); + // The checkpoint + CompletedCheckpoint checkpoint = null; + + try { + checkpoint = stateHandleAndPath.f0.getState(userClassLoader); + } catch (Exception e) { + Exception newException = new Exception("Could not retrieve the completed checkpoint " + + checkpointId + " from the state storage.", e); + + exception = ExceptionUtils.firstOrSuppressed(newException, exception); + } + + if (checkpoint != null) { + try { + checkpoint.discard(userClassLoader); + } catch (Exception e) { + Exception newException = new Exception("Could not discard the completed checkpoint " + + checkpoint + '.', e); + + exception = ExceptionUtils.firstOrSuppressed(newException, exception); + } + } + + try { + // Discard the state handle + stateHandleAndPath.f0.discardState(); + } catch (Exception e) { + Exception newException = new Exception("Could not discard meta data of completed checkpoint " + + checkpointId + '.', e); + + exception = ExceptionUtils.firstOrSuppressed(newException, exception); + } + + if (exception != null) { + throw exception; + } else { + // Discard the checkpoint + LOG.debug("Discarded {}.", checkpoint); + } } else { throw new IllegalStateException("Unexpected result code " + @@ -298,7 +340,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto } } catch (Exception e) { - LOG.error("Failed to discard checkpoint.", e); + LOG.warn("Failed to discard checkpoint {}.", checkpointId, e); } } }; @@ -308,4 +350,39 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // inconsistent state. checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback); } + + /** + * Convert a checkpoint id into a ZooKeeper path. + * + * @param checkpointId to convert to the path + * @return Path created from the given checkpoint id + */ + protected static String checkpointIdToPath(long checkpointId) { + return String.format("/%s", checkpointId); + } + + /** + * Converts a path to the checkpoint id. + * + * @param path in ZooKeeper + * @return Checkpoint id parsed from the path + */ + protected static long pathToCheckpointId(String path) { + try { + String numberString; + + // check if we have a leading slash + if ('/' == path.charAt(0) ) { + numberString = path.substring(1); + } else { + numberString = path; + } + return Long.parseLong(numberString); + } catch (NumberFormatException e) { + LOG.warn("Could not parse checkpoint id from {}. This indicates that the " + + "checkpoint id to path conversion has changed.", path); + + return -1L; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 84b679b..b336e54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -979,14 +979,17 @@ public class Execution implements Serializable { private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { // sanity check if (currentState.isTerminal()) { - throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + "."); + throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + '.'); } if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) { markTimestamp(targetState); - LOG.info(getVertex().getTaskNameWithSubtaskIndex() + " (" + getAttemptId() + ") switched from " - + currentState + " to " + targetState); + if (error == null) { + LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState); + } else { + LOG.info("{} ({}) switched from {} to {}.", getVertex().getTaskNameWithSubtaskIndex(), getAttemptId(), currentState, targetState, error); + } // make sure that the state transition completes normally. // potential errors (in listeners may not affect the main logic) http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index e4e1b36..8a446d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -480,7 +480,7 @@ public class Task implements Runnable { while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) { + if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } @@ -491,14 +491,14 @@ public class Task implements Runnable { return; } else if (current == ExecutionState.CANCELING) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) { + if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); return; } } else { - throw new IllegalStateException("Invalid state for beginning of task operation"); + throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } @@ -516,7 +516,7 @@ public class Task implements Runnable { // first of all, get a user-code classloader // this may involve downloading the job's JAR files and/or classes - LOG.info("Loading JAR files for task " + taskNameWithSubtask); + LOG.info("Loading JAR files for task {}.", this); userCodeClassLoader = createUserCodeClassloader(libraryCache); final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader); @@ -545,7 +545,7 @@ public class Task implements Runnable { // the registration must also strictly be undone // ---------------------------------------------------------------- - LOG.info("Registering task at network: " + this); + LOG.info("Registering task at network: {}.", this); network.registerTask(this); // next, kick off the background copying of files for the distributed cache @@ -553,13 +553,15 @@ public class Task implements Runnable { for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : DistributedCache.readFileInfoFromConfig(jobConfiguration)) { - LOG.info("Obtaining local cache file for '" + entry.getKey() + '\''); + LOG.info("Obtaining local cache file for '{}'.", entry.getKey()); Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId); distributedCacheEntries.put(entry.getKey(), cp); } } catch (Exception e) { - throw new Exception("Exception while adding files to distributed cache.", e); + throw new Exception( + String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), + e); } if (isCanceledOrFailed()) { @@ -598,11 +600,13 @@ public class Task implements Runnable { StateUtils.setOperatorState(op, state); } catch (Exception e) { - throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e); + throw new RuntimeException( + String.format("Failed to deserialize state handle and setup initial operator state for task %s (%s).", taskNameWithSubtask, executionId), + e); } } else { - throw new IllegalStateException("Found operator state for a non-stateful task invokable"); + throw new IllegalStateException(String.format("Found operator state for a non-stateful task %s (%s)", taskNameWithSubtask, executionId)); } } @@ -621,7 +625,7 @@ public class Task implements Runnable { this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime - if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { + if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } @@ -656,7 +660,7 @@ public class Task implements Runnable { // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED)) { + if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { notifyObservers(ExecutionState.FINISHED, null); } else { @@ -679,7 +683,7 @@ public class Task implements Runnable { if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) { if (t instanceof CancelTaskException) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(); notifyObservers(ExecutionState.CANCELED, null); @@ -687,19 +691,19 @@ public class Task implements Runnable { } } else { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { + if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause - LOG.error("Task execution failed. ", t); + String errorMessage = String.format("Execution of {} ({}) failed.", taskNameWithSubtask, executionId); failureCause = t; cancelInvokable(); - notifyObservers(ExecutionState.FAILED, t); + notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t)); break; } } } else if (current == ExecutionState.CANCELING) { - if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) { + if (transitionState(current, ExecutionState.CANCELED)) { notifyObservers(ExecutionState.CANCELED, null); break; } @@ -709,22 +713,22 @@ public class Task implements Runnable { break; } // unexpected state, go to failed - else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) { - LOG.error("Unexpected state in Task during an exception: " + current); + else if (transitionState(current, ExecutionState.FAILED, t)) { + LOG.error("Unexpected state in task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, current); break; } // else fall through the loop and } } catch (Throwable tt) { - String message = "FATAL - exception in task exception handler"; + String message = String.format("FATAL - exception in exception handler of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, tt); notifyFatalError(message, tt); } } finally { try { - LOG.info("Freeing task resources for " + taskNameWithSubtask); + LOG.info("Freeing task resources for {} ({}).", taskNameWithSubtask, executionId); // stop the async dispatcher. // copy dispatcher reference to stack, against concurrent release @@ -751,7 +755,7 @@ public class Task implements Runnable { } catch (Throwable t) { // an error in the resource cleanup is fatal - String message = "FATAL - exception in task resource cleanup"; + String message = String.format("FATAL - exception in resource cleanup of task %s (%s).", taskNameWithSubtask, executionId); LOG.error(message, t); notifyFatalError(message, t); } @@ -763,7 +767,7 @@ public class Task implements Runnable { metrics.close(); } catch (Throwable t) { - LOG.error("Error during metrics de-registration", t); + LOG.error("Error during metrics de-registration of task {} ({}).", taskNameWithSubtask, executionId, t); } } } @@ -829,6 +833,39 @@ public class Task implements Runnable { taskManager.tell(new FatalError(message, cause)); } + /** + * Try to transition the execution state from the current state to the new state. + * + * @param currentState of the execution + * @param newState of the execution + * @return true if the transition was successful, otherwise false + */ + private boolean transitionState(ExecutionState currentState, ExecutionState newState) { + return transitionState(currentState, newState, null); + } + + /** + * Try to transition the execution state from the current state to the new state. + * + * @param currentState of the execution + * @param newState of the execution + * @param cause of the transition change or null + * @return true if the transition was successful, otherwise false + */ + private boolean transitionState(ExecutionState currentState, ExecutionState newState, Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, currentState, newState)) { + if (cause == null) { + LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState); + } else { + LOG.info("{} ({}) switched from {} to {}.", taskNameWithSubtask, executionId, currentState, newState, cause); + } + + return true; + } else { + return false; + } + } + // ---------------------------------------------------------------------------------------------------------------- // Stopping / Canceling / Failing the task from the outside // ---------------------------------------------------------------------------------------------------------------- @@ -843,22 +880,22 @@ public class Task implements Runnable { * if the {@link AbstractInvokable} does not implement {@link StoppableTask} */ public void stopExecution() throws UnsupportedOperationException { - LOG.info("Attempting to stop task " + taskNameWithSubtask); - if(this.invokable instanceof StoppableTask) { + LOG.info("Attempting to stop task {} ({}).", taskNameWithSubtask, executionId); + if (invokable instanceof StoppableTask) { Runnable runnable = new Runnable() { @Override public void run() { try { - ((StoppableTask)Task.this.invokable).stop(); + ((StoppableTask)invokable).stop(); } catch(RuntimeException e) { - LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e); + LOG.error("Stopping task {} ({}) failed.", taskNameWithSubtask, executionId, e); taskManager.tell(new FailTask(executionId, e)); } } }; - executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask); + executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId)); } else { - throw new UnsupportedOperationException("Stopping not supported by this task."); + throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId)); } } @@ -871,7 +908,7 @@ public class Task implements Runnable { * <p>This method never blocks.</p> */ public void cancelExecution() { - LOG.info("Attempting to cancel task " + taskNameWithSubtask); + LOG.info("Attempting to cancel task {} ({}).", taskNameWithSubtask, executionId); cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null); } @@ -885,37 +922,52 @@ public class Task implements Runnable { * <p>This method never blocks.</p> */ public void failExternally(Throwable cause) { - LOG.info("Attempting to fail task externally " + taskNameWithSubtask); + LOG.info("Attempting to fail task externally {} ({}).", taskNameWithSubtask, executionId); cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause); } private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) { while (true) { - ExecutionState current = this.executionState; + ExecutionState current = executionState; // if the task is already canceled (or canceling) or finished or failed, // then we need not do anything if (current.isTerminal() || current == ExecutionState.CANCELING) { - LOG.info("Task " + taskNameWithSubtask + " is already in state " + current); + LOG.info("Task {} is already in state {}", taskNameWithSubtask, current); return; } if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) { - if (STATE_UPDATER.compareAndSet(this, current, targetState)) { + if (transitionState(current, targetState, cause)) { // if we manage this state transition, then the invokable gets never called // we need not call cancel on it this.failureCause = cause; - notifyObservers(targetState, cause); + notifyObservers( + targetState, + new Exception( + String.format( + "Cancel or fail execution of %s (%s).", + taskNameWithSubtask, + executionId), + cause)); return; } } else if (current == ExecutionState.RUNNING) { - if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) { + if (transitionState(ExecutionState.RUNNING, targetState, cause)) { // we are canceling / failing out of the running state // we need to cancel the invokable if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { this.failureCause = cause; - notifyObservers(targetState, cause); + notifyObservers( + targetState, + new Exception( + String.format( + "Cancel or fail execution of %s (%s).", + taskNameWithSubtask, + executionId), + cause)); + LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId); // because the canceling may block on user code, we cancel from a separate thread @@ -934,7 +986,7 @@ public class Task implements Runnable { producedPartitions, inputGates); Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler, - "Canceler for " + taskNameWithSubtask); + String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId)); cancelThread.setDaemon(true); cancelThread.start(); } @@ -942,7 +994,8 @@ public class Task implements Runnable { } } else { - throw new IllegalStateException("Unexpected task state: " + current); + throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).", + current, taskNameWithSubtask, executionId)); } } } @@ -956,13 +1009,6 @@ public class Task implements Runnable { } private void notifyObservers(ExecutionState newState, Throwable error) { - if (error == null) { - LOG.info(taskNameWithSubtask + " switched to " + newState); - } - else { - LOG.info(taskNameWithSubtask + " switched to " + newState + " with exception.", error); - } - TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error); UpdateTaskExecutionState actorMessage = new UpdateTaskExecutionState(stateUpdate); @@ -1009,16 +1055,20 @@ public class Task implements Runnable { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( "Error while triggering checkpoint " + checkpointID + " for " + - taskName, t)); + taskNameWithSubtask, t)); + } else { + LOG.debug("Encountered error while triggering checkpoint {} for " + + "{} ({}) while being not in state running.", checkpointID, + taskNameWithSubtask, executionId, t); } } } }; - executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); + executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } else { - LOG.error("Task received a checkpoint request, but is not a checkpointing task - " - + taskNameWithSubtask); + LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).", + taskNameWithSubtask, executionId); DeclineCheckpoint decline = new DeclineCheckpoint( jobId, executionId, checkpointID, @@ -1027,7 +1077,7 @@ public class Task implements Runnable { } } else { - LOG.debug("Declining checkpoint request for non-running task"); + LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId); // send back a message that we did not do the checkpoint DeclineCheckpoint decline = new DeclineCheckpoint( @@ -1066,12 +1116,12 @@ public class Task implements Runnable { executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName); } else { - LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - " - + taskNameWithSubtask); + LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.", + taskNameWithSubtask); } } else { - LOG.debug("Ignoring checkpoint commit notification for non-running task."); + LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask); } } @@ -1174,14 +1224,14 @@ public class Task implements Runnable { invokable.cancel(); } catch (Throwable t) { - LOG.error("Error while canceling task " + taskNameWithSubtask, t); + LOG.error("Error while canceling task {}.", taskNameWithSubtask, t); } } } @Override public String toString() { - return taskNameWithSubtask + " [" + executionState + ']'; + return String.format("%s (%s) [%s]", taskNameWithSubtask, executionId, executionState); } /** @@ -1257,7 +1307,7 @@ public class Task implements Runnable { try { invokable.cancel(); } catch (Throwable t) { - logger.error("Error while canceling the task", t); + logger.error("Error while canceling the task {}.", taskName, t); } // Early release of input and output buffer pools. We do this @@ -1271,7 +1321,7 @@ public class Task implements Runnable { try { partition.destroyBufferPool(); } catch (Throwable t) { - LOG.error("Failed to release result partition buffer pool.", t); + LOG.error("Failed to release result partition buffer pool for task {}.", taskName, t); } } @@ -1279,7 +1329,7 @@ public class Task implements Runnable { try { inputGate.releaseAllResources(); } catch (Throwable t) { - LOG.error("Failed to release input gate.", t); + LOG.error("Failed to release input gate for task {}.", taskName, t); } } @@ -1297,7 +1347,7 @@ public class Task implements Runnable { watchDogThread.join(); } } catch (Throwable t) { - logger.error("Error in the task canceler", t); + logger.error("Error in the task canceler for task {}.", taskName, t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/resources/log4j.properties b/flink-runtime/src/main/resources/log4j.properties index 9912b19..749796f 100644 --- a/flink-runtime/src/main/resources/log4j.properties +++ b/flink-runtime/src/main/resources/log4j.properties @@ -18,7 +18,7 @@ # Convenience file for local debugging of the JobManager/TaskManager. -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index cbf7b5d..2b455b7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -82,7 +82,6 @@ import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ForkJoinPool import scala.language.postfixOps /** @@ -1382,18 +1381,20 @@ class JobManager( case None => getClass.getClassLoader } - future { - Option(ackMessage.getState()) match { - case Some(state) => + Option(ackMessage.getState()) match { + case Some(state) => + future { try { state.deserializeValue(classLoader).discardState() } catch { - case e: Exception => log.warn("Could not discard orphaned checkpoint " + - "state.", e) + case e: Exception => + log.warn("Could not discard orphaned checkpoint state for " + + s"$ackMessage.", e) } - case None => - } - }(ExecutionContext.fromExecutor(ioExecutor)) + } (ExecutionContext.fromExecutor(ioExecutor)) + case None => + // no state to discard + } } } catch { case t: Throwable => http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java new file mode 100644 index 0000000..6ee0141 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { + + @Test + public void testPathConversion() { + final long checkpointId = 42L; + + final String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpointId); + + assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index f2fd859..6a696a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -103,6 +103,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -480,7 +481,9 @@ public class TaskManagerTest extends TestLogger { "found.")); tm.tell(new StopTask(eid2), testActorGateway); - expectMsgEquals(new TaskOperationResult(eid2, false, "UnsupportedOperationException: Stopping not supported by this task.")); + TaskOperationResult message = expectMsgClass(TaskOperationResult.class); + assertEquals(eid2, message.executionID()); + assertFalse(message.success()); assertEquals(ExecutionState.RUNNING, t2.getExecutionState()); http://git-wip-us.apache.org/repos/asf/flink/blob/b046038a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d7204a9..aa88175 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -202,7 +202,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> boolean disposed = false; try { // -------- Initialize --------- - LOG.debug("Initializing {}", getName()); + LOG.debug("Initializing {}.", getName()); userClassLoader = getUserCodeClassLoader(); configuration = new StreamConfig(getTaskConfiguration()); @@ -587,8 +587,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> catch (Exception e) { // propagate exceptions only if the task is still in "running" state if (isRunning) { - throw e; + throw new Exception("Could not perform checkpoint " + checkpointId + + "for operator " + getName() + '.', e); } else { + LOG.debug("Could not perform checkpoint {} for operator {} while the " + + "invokable was not in state running.", checkpointId, getName(), e); return false; } } @@ -600,10 +603,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> performCheckpoint(checkpointId, timestamp); } catch (CancelTaskException e) { - throw e; + throw new Exception("Operator " + getName() + " was cancelled while performing checkpoint " + + checkpointId + '.'); } catch (Exception e) { - throw new Exception("Error while performing checkpoint " + checkpointId + '.', e); + throw new Exception("Could not perform checkpoint " + checkpointId + " for operator " + + getName() + '.', e); } } @@ -651,13 +656,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> try { states[j].discardState(); } catch (Exception discardException) { - LOG.warn("Could not discard " + j + "th operator state.", discardException); + LOG.warn("Could not discard {}th operator state of " + + "checkpoint {} for operator {}.", j, checkpointId, + getName(), discardException); } } } - throw new Exception("Could not perform the checkpoint for " + i + - "th operator in chain.", exception); + throw new Exception("Could not perform the checkpoint " + checkpointId + + " for " + i + "th operator in chain.", exception); } if (state.getOperatorState() instanceof AsynchronousStateHandle) { @@ -768,7 +775,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> if (stateBackend != null) { // backend has been configured on the environment - LOG.info("Using user-defined state backend: " + stateBackend); + LOG.info("Using user-defined state backend: {}.", stateBackend); } else { // see if we have a backend specified in the configuration Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration(); @@ -787,8 +794,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> case "filesystem": FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig); - LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" - + backend.getBasePath() + "\")"); + LOG.info("State backend is set to heap memory (checkpoints to filesystem \"{}\")", + backend.getBasePath()); stateBackend = backend; break; @@ -945,11 +952,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> try { states[j].discardState(); } catch (Exception discardException) { - LOG.warn("Could not discard the " + j + "th operator state.", discardException); + LOG.warn("Could not discard the {}th operator state of " + + "checkpoint {} for operator {}.", j, checkpointId, + owner.getName(), discardException); } } - throw new Exception("Could not materialize the " + i + "th operator state.", exception); + throw new Exception("Could not materialize the " + i + "th operator " + + "state of operator " + owner.getName() + " for checkpoint " + + checkpointId + '.', exception); } } } @@ -962,10 +973,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } catch (Exception e) { if (owner.isRunning()) { - LOG.error("Caught exception while materializing asynchronous checkpoints.", e); + LOG.error("Caught exception while materializing asynchronous checkpoint {} for operator {}.", checkpointId, owner.getName(), e); } + if (owner.asyncException == null) { - owner.asyncException = new AsynchronousException(e); + owner.asyncException = new AsynchronousException( + new Exception("Could not materialize checkpoint " + checkpointId + + " of operator " + getName() + '.', e)); } } finally {