[hotfix] [streaming] Various cleanups in StreamTask
  - Clean up generics
  - Clean and safe disposal of initialized resources
  - Add names to asynchronous materialization threads
  - Fix concurrent modification of  materialization threads set


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/168532ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/168532ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/168532ec

Branch: refs/heads/master
Commit: 168532ece09b4be972f3ccad509a1e3376a1ac3a
Parents: bfff86c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 28 18:58:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 2 17:11:46 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 129 +++++++++----------
 .../runtime/state/StateBackendITCase.java       |   1 -
 2 files changed, 63 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/168532ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e4b6b6e..9ab6c10 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -28,13 +30,11 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AsynchronousStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -140,14 +140,14 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
         * actual execution Thread. */
        private volatile AsynchronousException asyncException;
 
-       protected Set<Thread> asyncCheckpointThreads;
+       /** The currently active background materialization threads */
+       private final Set<Thread> asyncCheckpointThreads = 
Collections.synchronizedSet(new HashSet<Thread>());
        
        /** Flag to mark the task "in operation", in which case check
         * needs to be initialized to true, so that early cancel() before 
invoke() behaves correctly */
        private volatile boolean isRunning;
 
        private long recoveryTimestamp;
-       
 
        // 
------------------------------------------------------------------------
        //  Life cycle methods for specific implementations
@@ -167,21 +167,19 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        
        @Override
        public final void invoke() throws Exception {
-               // 
--------------------------------------------------------------------
-               // Initialize
-               // 
--------------------------------------------------------------------
-               LOG.debug("Initializing {}", getName());
 
-               boolean initializationCompleted = false;
+               boolean disposed = false;
                try {
-                       AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
+                       // -------- Initialize ---------
+                       LOG.debug("Initializing {}", getName());
 
                        userClassLoader = getUserCodeClassLoader();
                        configuration = new 
StreamConfig(getTaskConfiguration());
-                       accumulatorMap = accumulatorRegistry.getUserMap();
+                       accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
 
                        headOperator = 
configuration.getStreamOperator(userClassLoader);
-                       operatorChain = new OperatorChain<>(this, headOperator, 
accumulatorRegistry.getReadWriteReporter());
+                       operatorChain = new OperatorChain<>(this, headOperator, 
+                                               
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
 
                        if (headOperator != null) {
                                headOperator.setup(this, configuration, 
operatorChain.getChainEntryPoint());
@@ -190,37 +188,16 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        timerService = 
Executors.newSingleThreadScheduledExecutor(
                                        new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
 
-                       asyncCheckpointThreads = new HashSet<>();
-
                        // task specific initialization
                        init();
-
-                       initializationCompleted = true;
-               }
-               finally {
-                       if (!initializationCompleted) {
-                               if (timerService != null) {
-                                       timerService.shutdownNow();
-                               }
-                               if (operatorChain != null) {
-                                       operatorChain.releaseOutputs();
-                               }
-                       }
-               }
-
-               // 
--------------------------------------------------------------------
-               // Invoke
-               // 
--------------------------------------------------------------------
-               LOG.debug("Invoking {}", getName());
-               
-               boolean disposed = false;
-               try {
-                       // first order of business is to initialize the state 
backend and to
-                       // give operators back their state
+                       
+                       // -------- Invoke --------
+                       LOG.debug("Invoking {}", getName());
+                       
+                       // first order of business is to give operators back 
their state
                        stateBackend = createStateBackend();
                        stateBackend.initializeForJob(getEnvironment());
-                       
-                       restoreStateLazy();
+                       restoreState();
                        
                        // we need to make sure that any triggers scheduled in 
open() cannot be
                        // executed before all operators are opened
@@ -255,14 +232,31 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        disposed = true;
                }
                finally {
+                       // clean up everything we initialized
                        isRunning = false;
 
-                       timerService.shutdownNow();
-
-                       for (Thread checkpointThread: asyncCheckpointThreads) {
-                               checkpointThread.interrupt();
+                       // stop all timers and threads
+                       if (timerService != null) {
+                               try {
+                                       timerService.shutdownNow();
+                               }
+                               catch (Throwable t) {
+                                       // catch and log the exception to not 
replace the original exception
+                                       LOG.error("Could not shut down timer 
service", t);
+                               }
+                       }
+                       
+                       // stop all asynchronous checkpoint threads
+                       try {
+                               for (Thread checkpointThread : 
asyncCheckpointThreads) {
+                                       checkpointThread.interrupt();
+                               }
+                               asyncCheckpointThreads.clear();
+                       }
+                       catch (Throwable t) {
+                               // catch and log the exception to not replace 
the original exception
+                               LOG.error("Could not shut down async checkpoint 
threads", t);
                        }
-                       asyncCheckpointThreads.clear();
                        
                        // release the output resources. this method should 
never fail.
                        if (operatorChain != null) {
@@ -270,13 +264,12 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        }
 
                        // we must! perform this cleanup
-
                        try {
                                cleanup();
                        }
                        catch (Throwable t) {
                                // catch and log the exception to not replace 
the original exception
-                               LOG.error("Error during cleanup of stream 
task.");
+                               LOG.error("Error during cleanup of stream 
task", t);
                        }
                        
                        // if the operators were not disposed before, do a hard 
dispose
@@ -333,14 +326,16 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
        
        private void disposeAllOperators() {
-               for (StreamOperator<?> operator : 
operatorChain.getAllOperators()) {
-                       try {
-                               if (operator != null) {
-                                       operator.dispose();
+               if (operatorChain != null) {
+                       for (StreamOperator<?> operator : 
operatorChain.getAllOperators()) {
+                               try {
+                                       if (operator != null) {
+                                               operator.dispose();
+                                       }
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Error during disposal of 
stream operator.", t);
                                }
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Error during disposal of stream 
operator.", t);
                        }
                }
        }
@@ -360,13 +355,11 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        if (!timerService.isTerminated()) {
                                LOG.warn("Timer service was not shut down. 
Shutting down in finalize().");
                        }
-                       timerService.shutdown();
+                       timerService.shutdownNow();
                }
 
-               if (asyncCheckpointThreads != null) {
-                       for (Thread checkpointThread : asyncCheckpointThreads) {
-                               checkpointThread.interrupt();
-                       }
+               for (Thread checkpointThread : asyncCheckpointThreads) {
+                       checkpointThread.interrupt();
                }
        }
 
@@ -417,7 +410,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                this.recoveryTimestamp = recoveryTimestamp;
        }
        
-       public void restoreStateLazy() throws Exception {
+       private void restoreState() throws Exception {
                if (lazyRestoreState != null) {
                        LOG.info("Restoring checkpointed state to task {}", 
getName());
                        
@@ -448,7 +441,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
 
        @Override
-       @SuppressWarnings("unchecked,rawtypes")
        public boolean triggerCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
                LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
                
@@ -497,26 +489,30 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        // start a Thread that does the 
asynchronous materialization and
                                        // then sends the checkpoint acknowledge
 
-                                       Thread checkpointThread = new Thread() {
+                                       String threadName = "Materialize 
checkpoint " + checkpointId + " for " + getName();
+                                       Thread checkpointThread = new 
Thread(threadName) {
                                                @Override
                                                public void run() {
                                                        try {
                                                                for 
(StreamTaskState state : states) {
                                                                        if 
(state != null) {
                                                                                
if (state.getFunctionState() instanceof AsynchronousStateHandle) {
-                                                                               
        AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) 
state.getFunctionState();
-                                                                               
        state.setFunctionState((StateHandle) asyncState.materialize());
+                                                                               
        AsynchronousStateHandle<Serializable> asyncState = 
(AsynchronousStateHandle<Serializable>) state.getFunctionState();
+                                                                               
        state.setFunctionState(asyncState.materialize());
                                                                                
}
                                                                                
if (state.getOperatorState() instanceof AsynchronousStateHandle) {
                                                                                
        AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) 
state.getOperatorState();
-                                                                               
        state.setOperatorState((StateHandle) asyncState.materialize());
+                                                                               
        state.setOperatorState(asyncState.materialize());
                                                                                
}
                                                                        }
                                                                }
                                                                
StreamTaskStateList allStates = new StreamTaskStateList(states);
                                                                
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
-                                                       } catch (Exception e) {
-                                                               
LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
+                                                       }
+                                                       catch (Exception e) {
+                                                               if 
(isRunning()) {
+                                                                       
LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
+                                                               }
                                                                if 
(asyncException == null) {
                                                                        
asyncException = new AsynchronousException(e);
                                                                }
@@ -527,6 +523,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        };
 
                                        
asyncCheckpointThreads.add(checkpointThread);
+                                       checkpointThread.setDaemon(true);
                                        checkpointThread.start();
                                }
                                return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/168532ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
index cdfef85..add532f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -35,7 +35,6 @@ import java.io.Serializable;
 
 import static org.junit.Assert.assertTrue;
 
-
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 
        /**

Reply via email to