[hotfix] [streaming] Various cleanups in StreamTask - Clean up generics - Clean and safe disposal of initialized resources - Add names to asynchronous materialization threads - Fix concurrent modification of materialization threads set
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/168532ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/168532ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/168532ec Branch: refs/heads/master Commit: 168532ece09b4be972f3ccad509a1e3376a1ac3a Parents: bfff86c Author: Stephan Ewen <se...@apache.org> Authored: Thu Jan 28 18:58:11 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 2 17:11:46 2016 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 129 +++++++++---------- .../runtime/state/StateBackendITCase.java | 1 - 2 files changed, 63 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/168532ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e4b6b6e..9ab6c10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.runtime.tasks; +import java.io.Serializable; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -28,13 +30,11 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.state.AsynchronousStateHandle; -import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; @@ -140,14 +140,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> * actual execution Thread. */ private volatile AsynchronousException asyncException; - protected Set<Thread> asyncCheckpointThreads; + /** The currently active background materialization threads */ + private final Set<Thread> asyncCheckpointThreads = Collections.synchronizedSet(new HashSet<Thread>()); /** Flag to mark the task "in operation", in which case check * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */ private volatile boolean isRunning; private long recoveryTimestamp; - // ------------------------------------------------------------------------ // Life cycle methods for specific implementations @@ -167,21 +167,19 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> @Override public final void invoke() throws Exception { - // -------------------------------------------------------------------- - // Initialize - // -------------------------------------------------------------------- - LOG.debug("Initializing {}", getName()); - boolean initializationCompleted = false; + boolean disposed = false; try { - AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); + // -------- Initialize --------- + LOG.debug("Initializing {}", getName()); userClassLoader = getUserCodeClassLoader(); configuration = new StreamConfig(getTaskConfiguration()); - accumulatorMap = accumulatorRegistry.getUserMap(); + accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); headOperator = configuration.getStreamOperator(userClassLoader); - operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter()); + operatorChain = new OperatorChain<>(this, headOperator, + getEnvironment().getAccumulatorRegistry().getReadWriteReporter()); if (headOperator != null) { headOperator.setup(this, configuration, operatorChain.getChainEntryPoint()); @@ -190,37 +188,16 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> timerService = Executors.newSingleThreadScheduledExecutor( new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); - asyncCheckpointThreads = new HashSet<>(); - // task specific initialization init(); - - initializationCompleted = true; - } - finally { - if (!initializationCompleted) { - if (timerService != null) { - timerService.shutdownNow(); - } - if (operatorChain != null) { - operatorChain.releaseOutputs(); - } - } - } - - // -------------------------------------------------------------------- - // Invoke - // -------------------------------------------------------------------- - LOG.debug("Invoking {}", getName()); - - boolean disposed = false; - try { - // first order of business is to initialize the state backend and to - // give operators back their state + + // -------- Invoke -------- + LOG.debug("Invoking {}", getName()); + + // first order of business is to give operators back their state stateBackend = createStateBackend(); stateBackend.initializeForJob(getEnvironment()); - - restoreStateLazy(); + restoreState(); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened @@ -255,14 +232,31 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> disposed = true; } finally { + // clean up everything we initialized isRunning = false; - timerService.shutdownNow(); - - for (Thread checkpointThread: asyncCheckpointThreads) { - checkpointThread.interrupt(); + // stop all timers and threads + if (timerService != null) { + try { + timerService.shutdownNow(); + } + catch (Throwable t) { + // catch and log the exception to not replace the original exception + LOG.error("Could not shut down timer service", t); + } + } + + // stop all asynchronous checkpoint threads + try { + for (Thread checkpointThread : asyncCheckpointThreads) { + checkpointThread.interrupt(); + } + asyncCheckpointThreads.clear(); + } + catch (Throwable t) { + // catch and log the exception to not replace the original exception + LOG.error("Could not shut down async checkpoint threads", t); } - asyncCheckpointThreads.clear(); // release the output resources. this method should never fail. if (operatorChain != null) { @@ -270,13 +264,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } // we must! perform this cleanup - try { cleanup(); } catch (Throwable t) { // catch and log the exception to not replace the original exception - LOG.error("Error during cleanup of stream task."); + LOG.error("Error during cleanup of stream task", t); } // if the operators were not disposed before, do a hard dispose @@ -333,14 +326,16 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } private void disposeAllOperators() { - for (StreamOperator<?> operator : operatorChain.getAllOperators()) { - try { - if (operator != null) { - operator.dispose(); + if (operatorChain != null) { + for (StreamOperator<?> operator : operatorChain.getAllOperators()) { + try { + if (operator != null) { + operator.dispose(); + } + } + catch (Throwable t) { + LOG.error("Error during disposal of stream operator.", t); } - } - catch (Throwable t) { - LOG.error("Error during disposal of stream operator.", t); } } } @@ -360,13 +355,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> if (!timerService.isTerminated()) { LOG.warn("Timer service was not shut down. Shutting down in finalize()."); } - timerService.shutdown(); + timerService.shutdownNow(); } - if (asyncCheckpointThreads != null) { - for (Thread checkpointThread : asyncCheckpointThreads) { - checkpointThread.interrupt(); - } + for (Thread checkpointThread : asyncCheckpointThreads) { + checkpointThread.interrupt(); } } @@ -417,7 +410,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> this.recoveryTimestamp = recoveryTimestamp; } - public void restoreStateLazy() throws Exception { + private void restoreState() throws Exception { if (lazyRestoreState != null) { LOG.info("Restoring checkpointed state to task {}", getName()); @@ -448,7 +441,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } @Override - @SuppressWarnings("unchecked,rawtypes") public boolean triggerCheckpoint(final long checkpointId, final long timestamp) throws Exception { LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); @@ -497,26 +489,30 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // start a Thread that does the asynchronous materialization and // then sends the checkpoint acknowledge - Thread checkpointThread = new Thread() { + String threadName = "Materialize checkpoint " + checkpointId + " for " + getName(); + Thread checkpointThread = new Thread(threadName) { @Override public void run() { try { for (StreamTaskState state : states) { if (state != null) { if (state.getFunctionState() instanceof AsynchronousStateHandle) { - AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getFunctionState(); - state.setFunctionState((StateHandle) asyncState.materialize()); + AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState(); + state.setFunctionState(asyncState.materialize()); } if (state.getOperatorState() instanceof AsynchronousStateHandle) { AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState(); - state.setOperatorState((StateHandle) asyncState.materialize()); + state.setOperatorState(asyncState.materialize()); } } } StreamTaskStateList allStates = new StreamTaskStateList(states); getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); - } catch (Exception e) { - LOG.error("Caught exception while materializing asynchronous checkpoints.", e); + } + catch (Exception e) { + if (isRunning()) { + LOG.error("Caught exception while materializing asynchronous checkpoints.", e); + } if (asyncException == null) { asyncException = new AsynchronousException(e); } @@ -527,6 +523,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> }; asyncCheckpointThreads.add(checkpointThread); + checkpointThread.setDaemon(true); checkpointThread.start(); } return true; http://git-wip-us.apache.org/repos/asf/flink/blob/168532ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java index cdfef85..add532f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -35,7 +35,6 @@ import java.io.Serializable; import static org.junit.Assert.assertTrue; - public class StateBackendITCase extends StreamingMultipleProgramsTestBase { /**