[FLINK-5718] [core] TaskManagers exit the JVM on fatal exceptions.

Manually applied and adapted commit dfc6fba5b9830e6a7804a6a0c9f69b36bf772730 for
the `release-1.2` branch.

This closes #3811.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daa54691
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daa54691
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daa54691

Branch: refs/heads/release-1.2
Commit: daa54691255158b1fcd0a55193ae3766efd79b12
Parents: 852a710
Author: Matt Zimmer <zimmerm...@netflix.com>
Authored: Tue May 2 16:46:13 2017 -0700
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri May 5 09:28:51 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |   4 +-
 .../flink/configuration/TaskManagerOptions.java |   5 +
 .../org/apache/flink/util/ExceptionUtils.java   |  37 +++
 .../apache/flink/runtime/taskmanager/Task.java  |  14 ++
 .../taskmanager/TaskManagerRuntimeInfo.java     |  32 ++-
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../taskmanager/TaskManagerConfiguration.scala  |   8 +-
 .../flink/runtime/testutils/TestJvmProcess.java |   9 +
 .../runtime/util/JvmExitOnFatalErrorTest.java   | 244 +++++++++++++++++++
 .../flink/core/testutils/CommonTestUtils.java   |  25 ++
 10 files changed, 373 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index d5863a1..5f669e8 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted using 
the `taskmanager.m
 
 - `taskmanager.memory.segment-size`: The size of memory buffers used by the 
memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).
 
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to 
`true`, then it is advised that this configuration is also set to `true`.  If 
this configuration is set to `false` cleaning up of the allocated offheap 
memory happens only when the configured JVM parameter MaxDirectMemorySize is 
reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to 
`true`, then it is advised that this configuration is also set to `true`.  If 
this configuration is set to `false` cleaning up of the allocated offheap 
memory happens only when the configured JVM parameter MaxDirectMemorySize is 
reached by triggering a full GC. **Note:** For streaming setups, we highly 
recommend to set this value to `false` as the core state backends currently do 
not use the managed memory.
 
 ### Memory and Performance Debugging
 
@@ -263,6 +263,8 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 - `taskmanager.max-registration-pause`: The maximum registration pause between 
two consecutive registration attempts. The max registration pause requires a 
time unit specifier (ms/s/min/h/d) (e.g. "5 s"). (DEFAULT: **30 s**)
 
+- `taskmanager.jvm-exit-on-oom`: Indicates that the TaskManager should 
immediately terminate the JVM if the task thread throws an `OutOfMemoryError` 
(DEFAULT: **false**).
+
 - `taskmanager.refused-registration-pause`: The pause after a registration has 
been refused by the job manager before retrying to connect. The refused 
registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. "5 s"). 
(DEFAULT: **10 s**)
 
 - `blob.fetch.retries`: The number of retries for the TaskManager to download 
BLOBs (such as JAR files) from the JobManager (DEFAULT: **50**).

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 3bd15fe..05f670c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -34,6 +34,11 @@ public class TaskManagerOptions {
 
        // @TODO Migrate 'taskmanager.*' config options from ConfigConstants
 
+       /** Whether to kill the TaskManager when the task thread throws an 
OutOfMemoryError */
+       public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
+               key("taskmanager.jvm-exit-on-oom")
+                       .defaultValue(false);
+
        // 
------------------------------------------------------------------------
        //  Network Options
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d1357a8..8ec3d59 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -33,8 +33,13 @@ import java.io.StringWriter;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * A collection of utility functions for dealing with exceptions and exception 
workflows.
+ */
 @Internal
 public final class ExceptionUtils {
+
+       /** The stringified representation of a null exception reference */
        public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
 
        /**
@@ -64,6 +69,38 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Checks whether the given exception indicates a situation that may 
leave the
+        * JVM in a corrupted state, meaning a state where continued normal 
operation can only be
+        * guaranteed via clean process restart.
+        *
+        * <p>Currently considered fatal exceptions are Virtual Machine errors 
indicating
+        * that the JVM is corrupted, like {@link InternalError}, {@link 
UnknownError},
+        * and {@link java.util.zip.ZipError} (a special case of InternalError).
+        *
+        * @param t The exception to check.
+        * @return True, if the exception is considered fatal to the JVM, false 
otherwise.
+        */
+       public static boolean isJvmFatalError(Throwable t) {
+               return (t instanceof InternalError) || (t instanceof 
UnknownError);
+       }
+
+       /**
+        * Checks whether the given exception indicates a situation that may 
leave the
+        * JVM in a corrupted state, or an out-of-memory error.
+        *
+        * <p>See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a list 
of fatal JVM errors.
+        * This method additionally classifies the {@link OutOfMemoryError} as 
fatal, because it
+        * may occur in any thread (not the one that allocated the majority of 
the memory) and thus
+        * is often not recoverable by destroying the particular thread that 
threw the exception.
+        *
+        * @param t The exception to check.
+        * @return True, if the exception is fatal to the JVM or and 
OutOfMemoryError, false otherwise.
+        */
+       public static boolean isJvmFatalOrOutOfMemoryError(Throwable t) {
+               return isJvmFatalError(t) || t instanceof OutOfMemoryError;
+       }
+
+       /**
         * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) 
suppressed exception}
         * to a prior exception, or returns the new exception, if no prior 
exception exists.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 65b3053..8b51088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -698,6 +699,19 @@ public class Task implements Runnable, TaskActions {
                        // 
----------------------------------------------------------------
 
                        try {
+                               // check if the exception is unrecoverable
+                               if (ExceptionUtils.isJvmFatalError(t) ||
+                                       (t instanceof OutOfMemoryError && 
taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
+                               {
+                                       // terminate the JVM immediately
+                                       // don't attempt a clean shutdown, 
because we cannot expect the clean shutdown to complete
+                                       try {
+                                               LOG.error("Encountered fatal 
error {} - terminating the JVM", t.getClass().getName(), t);
+                                       } finally {
+                                               Runtime.getRuntime().halt(-1);
+                                       }
+                               }
+
                                // transition into our final state. we should 
be either in DEPLOYING, RUNNING, CANCELING, or FAILED
                                // loop for multiple retries during concurrent 
state changes via calls to cancel() or
                                // to failExternally()

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
index 9ac982e..041392b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerRuntimeInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -38,7 +39,10 @@ public class TaskManagerRuntimeInfo implements 
java.io.Serializable {
 
        /** list of temporary file directories */
        private final String[] tmpDirectories;
-       
+
+       /** Flag that signals whether to halt the JVM if an OutOfMemoryError is 
thrown */
+       private final boolean exitJvmOnOutOfMemory;
+
        /**
         * Creates a runtime info.
         * 
@@ -49,18 +53,30 @@ public class TaskManagerRuntimeInfo implements 
java.io.Serializable {
        public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String tmpDirectory) {
                this(hostname, configuration, new String[] { tmpDirectory });
        }
-       
+
        /**
         * Creates a runtime info.
         * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
         * @param configuration The configuration that the TaskManager was 
started with.
-        * @param tmpDirectories The list of temporary file directories.   
+        * @param tmpDirectories The list of temporary file directories.
         */
        public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String[] tmpDirectories) {
+               this(hostname, configuration, tmpDirectories, 
configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY));
+       }
+
+       /**
+        * Creates a runtime info.
+        * @param hostname The host name of the interface that the TaskManager 
uses to communicate.
+        * @param configuration The configuration that the TaskManager was 
started with.
+        * @param tmpDirectories The list of temporary file directories.
+        * @param exitJvmOnOutOfMemory True to terminate the JVM on an 
OutOfMemoryError, false otherwise.
+        */
+       public TaskManagerRuntimeInfo(String hostname, Configuration 
configuration, String[] tmpDirectories, boolean exitJvmOnOutOfMemory) {
                checkArgument(tmpDirectories.length > 0);
                this.hostname = checkNotNull(hostname);
                this.configuration = checkNotNull(configuration);
                this.tmpDirectories = tmpDirectories;
+               this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
                
        }
 
@@ -87,4 +103,14 @@ public class TaskManagerRuntimeInfo implements 
java.io.Serializable {
        public String[] getTmpDirectories() {
                return tmpDirectories;
        }
+
+       /**
+        * Checks whether the TaskManager should exit the JVM when the task 
thread throws
+        * an OutOfMemoryError.
+        *
+        * @return True to terminate the JVM on an OutOfMemoryError, false 
otherwise.
+        */
+       public boolean shouldExitJvmOnOutOfMemoryError() {
+               return exitJvmOnOutOfMemory;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index bc63655..bb93fa1 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2382,7 +2382,8 @@ object TaskManager {
       configuration,
       initialRegistrationPause,
       maxRegistrationPause,
-      refusedRegistrationPause)
+      refusedRegistrationPause,
+      configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY))
 
     (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index aab3c5f..929ff55 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{Configuration, TaskManagerOptions}
 
 import scala.concurrent.duration.FiniteDuration
 
@@ -33,7 +33,8 @@ case class TaskManagerConfiguration(
     configuration: Configuration,
     initialRegistrationPause: FiniteDuration,
     maxRegistrationPause: FiniteDuration,
-    refusedRegistrationPause: FiniteDuration) {
+    refusedRegistrationPause: FiniteDuration,
+    exitJvmOnOutOfMemory: Boolean) {
 
   def this(
       tmpDirPaths: Array[String],
@@ -51,6 +52,7 @@ case class TaskManagerConfiguration(
       configuration,
       FiniteDuration(500, TimeUnit.MILLISECONDS),
       FiniteDuration(30, TimeUnit.SECONDS),
-      FiniteDuration(10, TimeUnit.SECONDS))
+      FiniteDuration(10, TimeUnit.SECONDS),
+      configuration.getBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 5954ee5..4578edf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -295,6 +295,15 @@ public abstract class TestJvmProcess {
                }
        }
 
+       public void waitFor() throws InterruptedException {
+               Process process = this.process;
+               if (process != null) {
+                       process.waitFor();
+               } else {
+                       throw new IllegalStateException("process not started");
+               }
+       }
+
        // 
---------------------------------------------------------------------------------------------
        // File based synchronization utilities
        // 
---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
new file mode 100644
index 0000000..bf75549
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskmanager.*;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class JvmExitOnFatalErrorTest {
+
+       @Test
+       public void testExitJvmOnOutOfMemory() throws Exception {
+               // this test works only on linux
+               assumeTrue(OperatingSystem.isLinux());
+
+               // this test leaves remaining processes if not executed with 
Java 8
+               CommonTestUtils.assumeJava8();
+
+               // to check what went wrong (when the test hangs) uncomment 
this line
+//             ProcessEntryPoint.main(new String[0]);
+
+               final KillOnFatalErrorProcess testProcess = new 
KillOnFatalErrorProcess();
+
+               try {
+                       testProcess.startProcess();
+                       testProcess.waitFor();
+               }
+               finally {
+                       testProcess.destroy();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Blocking Process Implementation
+       // 
------------------------------------------------------------------------
+
+       private static final class KillOnFatalErrorProcess extends 
TestJvmProcess {
+
+               public KillOnFatalErrorProcess() throws Exception {}
+
+               @Override
+               public String getName() {
+                       return "KillOnFatalErrorProcess";
+               }
+
+               @Override
+               public String[] getJvmArgs() {
+                       return new String[0];
+               }
+
+               @Override
+               public String getEntryPointClassName() {
+                       return ProcessEntryPoint.class.getName();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static final class ProcessEntryPoint {
+
+               public static void main(String[] args) throws Exception {
+
+                       System.err.println("creating task");
+
+                       // we suppress process exits via errors here to not
+                       // have a test that exits accidentally due to a 
programming error
+                       try {
+                               final Configuration taskManagerConfig = new 
Configuration();
+                               
taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
+
+                               final JobID jid = new JobID();
+                               final JobVertexID jobVertexId = new 
JobVertexID();
+                               final ExecutionAttemptID executionAttemptID = 
new ExecutionAttemptID();
+                               final AllocationID slotAllocationId = new 
AllocationID();
+
+                               final SerializedValue<ExecutionConfig> 
execConfig = new SerializedValue<>(new ExecutionConfig());
+
+                               final JobInformation jobInformation = new 
JobInformation(
+                                       jid, "Test Job", execConfig, new 
Configuration(),
+                                       Collections.<BlobKey>emptyList(), 
Collections.<URL>emptyList());
+
+                               final TaskInformation taskInformation = new 
TaskInformation(
+                                       jobVertexId, "Test Task", 1, 1, 
OomInvokable.class.getName(), new Configuration());
+
+                               final MemoryManager memoryManager = new 
MemoryManager(1024 * 1024, 1);
+                               final IOManager ioManager = new 
IOManagerAsync();
+
+                               final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
+                               
when(networkEnvironment.createKvStateTaskRegistry(jid, 
jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
+
+                               final String[] tmpDirPaths = 
taskManagerConfig.getString(
+                                       
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+                                       
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
+
+                               final TaskManagerRuntimeInfo tmInfo = new 
TaskManagerRuntimeInfo("test", taskManagerConfig, tmpDirPaths);
+
+                               final Executor executor = 
Executors.newCachedThreadPool();
+
+                               Task task = new Task(
+                                       jobInformation,
+                                       taskInformation,
+                                       executionAttemptID,
+                                       0,       // subtaskIndex
+                                       0,       // attemptNumber
+                                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                       
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                                       0,       // targetSlotNumber
+                                       null,    // taskStateHandles,
+                                       memoryManager,
+                                       ioManager,
+                                       networkEnvironment,
+                                       new BroadcastVariableManager(),
+                                       new 
ActorGatewayTaskManagerConnection(new DummyActorGateway()),
+                                       new NoOpInputSplitProvider(),
+                                       new NoOpCheckpointResponder(),
+                                       new FallbackLibraryCacheManager(),
+                                       new FileCache(taskManagerConfig),
+                                       tmInfo,
+                                       new UnregisteredTaskMetricsGroup(),
+                                       new 
NoOpResultPartitionConsumableNotifier(),
+                                       new NoOpPartitionProducerStateChecker(),
+                                       executor);
+
+                               System.err.println("starting task thread");
+
+                               task.startTaskThread();
+                       }
+                       catch (Throwable t) {
+                               System.err.println("ERROR STARTING TASK");
+                               t.printStackTrace();
+                       }
+
+                       System.err.println("parking the main thread");
+                       CommonTestUtils.blockForeverNonInterruptibly();
+               }
+
+               public static final class OomInvokable extends 
AbstractInvokable {
+
+                       @Override
+                       public void invoke() throws Exception {
+                               throw new OutOfMemoryError();
+                       }
+               }
+
+               private static final class NoOpInputSplitProvider implements 
InputSplitProvider {
+
+                       @Override
+                       public InputSplit getNextInputSplit(ClassLoader 
userCodeClassLoader) {
+                               return null;
+                       }
+               }
+
+               private static final class NoOpCheckpointResponder implements 
CheckpointResponder {
+
+                       @Override
+                       public void acknowledgeCheckpoint(JobID j, 
ExecutionAttemptID e, CheckpointMetaData c, SubtaskState s) {}
+
+                       @Override
+                       public void declineCheckpoint(JobID j, 
ExecutionAttemptID e, long l, Throwable t) {}
+               }
+
+               private static final class 
NoOpResultPartitionConsumableNotifier implements 
ResultPartitionConsumableNotifier {
+
+                       @Override
+                       public void notifyPartitionConsumable(JobID j, 
ResultPartitionID p, TaskActions t) {}
+               }
+
+               private static final class NoOpPartitionProducerStateChecker 
implements PartitionProducerStateChecker {
+
+                       @Override
+                       public Future<ExecutionState> 
requestPartitionProducerState(
+                               JobID jobId, IntermediateDataSetID 
intermediateDataSetId, ResultPartitionID r) {
+                               return null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daa54691/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 2eb18c1..639b065 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -97,6 +97,27 @@ public class CommonTestUtils {
        }
 
        /**
+        * Permanently blocks the current thread. The thread cannot be woken
+        * up via {@link Thread#interrupt()}.
+        */
+       public static void blockForeverNonInterruptibly() {
+               final Object lock = new Object();
+               //noinspection InfiniteLoopStatement
+               while (true) {
+                       try {
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       } catch (InterruptedException ignored) {}
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Preconditions on the test environment
+       // 
------------------------------------------------------------------------
+
+       /**
         * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, 
this throws a
         * {@link AssumptionViolatedException}, which causes JUnit to skip the 
test that
         * called this method.
@@ -117,6 +138,10 @@ public class CommonTestUtils {
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Manipulation of environment
+       // 
------------------------------------------------------------------------
+
        public static void setEnv(Map<String, String> newenv) {
                setEnv(newenv, true);
        }

Reply via email to